Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21549][CORE] Respect OutputFormats with no output directory provided #19294

Closed
wants to merge 15 commits into from

Conversation

szhem
Copy link
Contributor

@szhem szhem commented Sep 20, 2017

What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths provided.
The examples of such formats are Cassandra OutputFormat, Aerospike OutputFormat, etc. which do not have an ability to rollback the results written to an external systems on job failure.

Provided output directory is required by Spark to allows files to be committed to an absolute output location, that is not the case for output formats which write data to external systems.

This pull request prevents accessing absPathStagingDir method that causes the error described in SPARK-21549 unless there are files to rename in addedAbsPathFiles.

How was this patch tested?

Unit tests

…ted to an absolute output location in case of custom output formats
…ted to an absolute location - reformatting imports
pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
} finally {
// close to prevent filesystem caching across different tests
fs.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid. Either use FileSystem.newInstance() or skip the close. Given you aren't playing with low-level FS options, its faster and more efficient to reuse

Copy link
Contributor Author

@szhem szhem Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was counting on indirect filesystem caching, so that it was exactly the same both in tests as well as in SparkHadoopWriter and calling to newInstance prevents us from such a possibility. Currently I've updated PR to not use filesystem at all.

pairs.saveAsHadoopDataset(conf)
} finally {
// close to prevent filesystem caching across different tests
fs.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, you don't need this.

Copy link
Contributor Author

@szhem szhem Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated PR to not use filesystem at all.

…s with absolute names to rename in addedAbsPathFiles
@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 21, 2017

cc @jiangxb1987 who I believe is interested in this. Without a super close look, it looks making sense.

The actual problem here looks indeed path being null (as described in the JIRA):

outputPath = getConf.get("mapreduce.output.fileoutputformat.outputdir")

class HadoopMapReduceCommitProtocol(jobId: String, path: String)

private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)

Can not create a Path from a null string
java.lang.IllegalArgumentException: Can not create a Path from a null string
  at org.apache.hadoop.fs.Path.checkPathArg(Path.java:123)
  at org.apache.hadoop.fs.Path.<init>(Path.java:135)
  at org.apache.hadoop.fs.Path.<init>(Path.java:89)
  at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.absPathStagingDir(HadoopMapReduceCommitProtocol.scala:58)
  at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:141)
  at org.apache.spark.internal.io.SparkHadoopMapReduceWriter$.write(SparkHadoopMapReduceWriter.scala:106)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1085)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:1085)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:1084)

@SparkQA
Copy link

SparkQA commented Sep 21, 2017

Test build #82015 has finished for PR 19294 at commit 621c337.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
if (addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider using a common method instead of duplicating the code in the 2 if statements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced method

/**
 * Checks whether there are files to be committed to an absolute output location.
 */
private def hasAbsPathFiles: Boolean = addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty

…les to be committed to an absolute output location
@SparkQA
Copy link

SparkQA commented Sep 21, 2017

Test build #82020 has finished for PR 19294 at commit 34bb694.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
fs.delete(absPathStagingDir, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the changes being made here, it seems a good place to add the suggestion of SPARK-20045 & make that abort() call resilient to failures, by doing that delete even if the hadoop committer raised an IOE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to fix it in separate PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can do, now you've got a little mock committer in someone can just extend it to optionally throw an IOE in abort().

/**
* Checks whether there are files to be committed to an absolute output location.
*/
private def hasAbsPathFiles: Boolean = addedAbsPathFiles != null && addedAbsPathFiles.nonEmpty
Copy link
Contributor

@mridulm mridulm Sep 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When addedAbsPathFiles is null and when it is not is slightly confusing.
Can we move to using an Option[Map[String, String]] instead ?

In the earlier code, it must always be non-null; but now it becomes optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thank you!
According to the FileCommitProtocol, addedAbsPathFiles is always null on driver, so we will not be able to commit or remove these files.

Replaced it with

 private def hasAbsPathFiles: Boolean = path != null

@mridulm
Copy link
Contributor

mridulm commented Sep 21, 2017

@szhem Did you try this patch with sql ?
A cursory look at org.apache.spark.sql.execution.datasources.FileFormatWriter and org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand indicates use of methods in this class which will continue to throw NPE ?

+CC @ericl

@jiangxb1987
Copy link
Contributor

IMO it should be fine to not provide output directory if you are not using absolute output paths, I also don't think we should always create absolute output paths in HadoopMapReduceCommitProtocol, cc @cloud-fan for more input.

@mridulm
Copy link
Contributor

mridulm commented Sep 21, 2017

+CC @weiqingy
Can you try this PR with SHC and see if it works ?
That is, remove your current workaround for SPARK-21549 from SHC and try writing to hbase with a spark version patched with this PR [1]
That will allow us to have a real world test, and possibly surface issues.

@szhem incorporating a test for the sql part will also help in this matter.

[1] SHC will need the workaround even if this issue is resolved (since 2.2 has been released with this bug).

@steveloughran
Copy link
Contributor

As I play with commit logic all the way through the stack, I can' t help thinking everyone's lives would be better if we tagged the MRv1 commit APIs as deprecated in Hadoop 3. and uses of the commit protocols went fully onto the v2 committers: one codepath to get confused by, half as much complexity.

The issue with the custom stuff is inevitably Hive related, isn't it? It's always liked to scatter data around a filesystem and pretend its a single dataset

@szhem
Copy link
Contributor Author

szhem commented Sep 23, 2017

@mridulm

incorporating a test for the sql part will also help in this matter.

What should be the expected behaviour in case of sql?
I'm asking because the sql part seems to fail even before setupJob on the committer is called.

FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

@mridulm
Copy link
Contributor

mridulm commented Sep 24, 2017

@szhem You are correct, currently it fails in the driver itself.
So failures in executor are not seen - since job submission fails.

With this pr, the job submission should succeed - but the subsequent execution in sql could fail (since sql uses some of the methods which have not been patched in this PR if I am not wrong - newTaskTempFileAbsPath, newTaskTempFile, etc).

A testcase to validate successful writes from a datasource in spark sql would clarify things.

… an absolute output location by means of checking whether the output path specified
@SparkQA
Copy link

SparkQA commented Sep 24, 2017

Test build #82131 has finished for PR 19294 at commit 3429de5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@szhem
Copy link
Contributor Author

szhem commented Sep 24, 2017

@mridulm Updated FileFormatWriterSuite to cover both branches of the committer calling - for newTaskTempFile as well as for newTaskTempFileAbsPath.

@SparkQA
Copy link

SparkQA commented Sep 24, 2017

Test build #82130 has finished for PR 19294 at commit ae0ba0a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 24, 2017

Test build #82133 has finished for PR 19294 at commit 7963b58.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

If path could be null, this line will still fail with the error message like Can not create a Path from a null string.

In our Spark SQL code path, how can path be null?

@szhem
Copy link
Contributor Author

szhem commented Oct 3, 2017

@gatorsmile I believe that in Spark SQL code path path cannot be null, because in that case FileFormatWriter fails even before setupJob (which in its order calls setupCommitter) on the committer is called.

The interesting part is that the Hadoop's FileOutputCommitter allows null output paths and the line you highlighted is executed only in case of FileOutputCommitter implementations.

So there may be a chance that someone would like to use custom implementation of FileOutputCommitter, which allows nulls according to Hadoop docs, with Spark SQL.
I believe it is not an issue at all, because Spark SQL does not allow nulls.

@SparkQA
Copy link

SparkQA commented Oct 3, 2017

Test build #82411 has finished for PR 19294 at commit ff7b084.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@steveloughran
Copy link
Contributor

@szhem that null path support in FileOutputCommitter came with the App Master recovery work of MAPREDUCE-3711; its, trying to minimise the amount of HDFS IO done during the recovery process.

I don't think that's a codepath spark goes near; in the normal execution paths, FileOutputFormat & FileOutputCommitter will need output paths.

(disclaimer: the more I read of that code, the less I understand it. do not treat my opinions as normative in any way)

@SparkQA
Copy link

SparkQA commented Oct 3, 2017

Test build #82412 has finished for PR 19294 at commit ff7b084.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Oct 6, 2017

@gatorsmile have your concerns been addressed ? If yes, I will merge this into master and 2.2.1

This patch is clearly better than existing state for 2.2 and master - for spark core and some of the data sources I tested with.

@gatorsmile
Copy link
Member

Since this is not related to Spark SQL, please do not add the test cases to the Spark SQL side.

@mridulm
Copy link
Contributor

mridulm commented Oct 6, 2017

@gatorsmile Sounds good, @szhem can we remove the spark sql tests you added (due to my request).
Once build passes, I will commit this - it will definitely help spark core users.

@SparkQA
Copy link

SparkQA commented Oct 6, 2017

Test build #82504 has finished for PR 19294 at commit e41abc6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@szhem
Copy link
Contributor Author

szhem commented Oct 6, 2017

@mridulm sql-related tests were removed.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

/**
* Checks whether there are files to be committed to an absolute output location.
*
* As the committing and aborting the job occurs on driver where `addedAbsPathFiles` is always
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the committing -> As committing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* As the committing and aborting the job occurs on driver where `addedAbsPathFiles` is always
* null, it is necessary to check whether the output path is specified, that may not be the case
* for committers not writing to distributed file systems.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also has a grammar issue. It is not clear too

Copy link
Contributor

@mridulm mridulm Oct 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about :
"As committing and aborting a job occurs on driver, where addedAbsPathFiles is always null, it is necessary to check whether the output path is specified. Output path may not be required for committers not writing to distributed file systems"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, guys! I've just updated the comment

@SparkQA
Copy link

SparkQA commented Oct 7, 2017

Test build #82529 has finished for PR 19294 at commit f55b7c2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Oct 7, 2017
…ovided

## What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths provided.
The examples of such formats are [Cassandra OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java), [Aerospike OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java), etc. which do not have an ability to rollback the results written to an external systems on job failure.

Provided output directory is required by Spark to allows files to be committed to an absolute output location, that is not the case for output formats which write data to external systems.

This pull request prevents accessing `absPathStagingDir` method that causes the error described in SPARK-21549 unless there are files to rename in `addedAbsPathFiles`.

## How was this patch tested?

Unit tests

Author: Sergey Zhemzhitsky <szhemzhitski@gmail.com>

Closes #19294 from szhem/SPARK-21549-abs-output-commits.

(cherry picked from commit 2030f19)
Signed-off-by: Mridul Muralidharan <mridul@gmail.com>
@asfgit asfgit closed this in 2030f19 Oct 7, 2017
@mridulm
Copy link
Contributor

mridulm commented Oct 7, 2017

Thanks for the fix @szhem, great work !
Merged to master and 2.2.1

asfgit pushed a commit that referenced this pull request Oct 16, 2017
…ctory provided

## What changes were proposed in this pull request?

PR #19294 added support for null's - but spark 2.1 handled other error cases where path argument can be invalid.
Namely:

* empty string
* URI parse exception while creating Path

This is resubmission of PR #19487, which I messed up while updating my repo.

## How was this patch tested?

Enhanced test to cover new support added.

Author: Mridul Muralidharan <mridul@gmail.com>

Closes #19497 from mridulm/master.

(cherry picked from commit 13c1559)
Signed-off-by: Mridul Muralidharan <mridul@gmail.com>
asfgit pushed a commit that referenced this pull request Oct 16, 2017
…ctory provided

## What changes were proposed in this pull request?

PR #19294 added support for null's - but spark 2.1 handled other error cases where path argument can be invalid.
Namely:

* empty string
* URI parse exception while creating Path

This is resubmission of PR #19487, which I messed up while updating my repo.

## How was this patch tested?

Enhanced test to cover new support added.

Author: Mridul Muralidharan <mridul@gmail.com>

Closes #19497 from mridulm/master.
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…ovided

## What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths provided.
The examples of such formats are [Cassandra OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java), [Aerospike OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java), etc. which do not have an ability to rollback the results written to an external systems on job failure.

Provided output directory is required by Spark to allows files to be committed to an absolute output location, that is not the case for output formats which write data to external systems.

This pull request prevents accessing `absPathStagingDir` method that causes the error described in SPARK-21549 unless there are files to rename in `addedAbsPathFiles`.

## How was this patch tested?

Unit tests

Author: Sergey Zhemzhitsky <szhemzhitski@gmail.com>

Closes apache#19294 from szhem/SPARK-21549-abs-output-commits.

(cherry picked from commit 2030f19)
Signed-off-by: Mridul Muralidharan <mridul@gmail.com>
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…ctory provided

## What changes were proposed in this pull request?

PR apache#19294 added support for null's - but spark 2.1 handled other error cases where path argument can be invalid.
Namely:

* empty string
* URI parse exception while creating Path

This is resubmission of PR apache#19487, which I messed up while updating my repo.

## How was this patch tested?

Enhanced test to cover new support added.

Author: Mridul Muralidharan <mridul@gmail.com>

Closes apache#19497 from mridulm/master.

(cherry picked from commit 13c1559)
Signed-off-by: Mridul Muralidharan <mridul@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants