Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
[SPARK-21549][CORE] Respect OutputFormats with no output directory pr…
Browse files Browse the repository at this point in the history
…ovided

## What changes were proposed in this pull request?

Fix for https://issues.apache.org/jira/browse/SPARK-21549 JIRA issue.

Since version 2.2 Spark does not respect OutputFormat with no output paths provided.
The examples of such formats are [Cassandra OutputFormat](https://github.com/finn-no/cassandra-hadoop/blob/08dfa3a7ac727bb87269f27a1c82ece54e3f67e6/src/main/java/org/apache/cassandra/hadoop2/AbstractColumnFamilyOutputFormat.java), [Aerospike OutputFormat](https://github.com/aerospike/aerospike-hadoop/blob/master/mapreduce/src/main/java/com/aerospike/hadoop/mapreduce/AerospikeOutputFormat.java), etc. which do not have an ability to rollback the results written to an external systems on job failure.

Provided output directory is required by Spark to allows files to be committed to an absolute output location, that is not the case for output formats which write data to external systems.

This pull request prevents accessing `absPathStagingDir` method that causes the error described in SPARK-21549 unless there are files to rename in `addedAbsPathFiles`.

## How was this patch tested?

Unit tests

Author: Sergey Zhemzhitsky <szhemzhitski@gmail.com>

Closes apache#19294 from szhem/SPARK-21549-abs-output-commits.
  • Loading branch information
szhem authored and mridulm committed Oct 7, 2017
1 parent debcbec commit 2030f19
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
* (from the newer mapreduce API, not the old mapred API).
*
* Unlike Hadoop's OutputCommitter, this implementation is serializable.
*
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
*/
class HadoopMapReduceCommitProtocol(jobId: String, path: String)
extends FileCommitProtocol with Serializable with Logging {
Expand All @@ -57,6 +60,15 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
*/
private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)

/**
* Checks whether there are files to be committed to an absolute output location.
*
* As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null,
* it is necessary to check whether the output path is specified. Output path may not be required
* for committers not writing to distributed file systems.
*/
private def hasAbsPathFiles: Boolean = path != null

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
Expand Down Expand Up @@ -130,17 +142,21 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
if (hasAbsPathFiles) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
}
fs.delete(absPathStagingDir, true)
}
fs.delete(absPathStagingDir, true)
}

override def abortJob(jobContext: JobContext): Unit = {
committer.abortJob(jobContext, JobStatus.State.FAILED)
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(absPathStagingDir, true)
if (hasAbsPathFiles) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(absPathStagingDir, true)
}
}

override def setupTask(taskContext: TaskAttemptContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.commons.math3.distribution.{BinomialDistribution, PoissonDistr
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.{JobContext => NewJobContext,
import org.apache.hadoop.mapreduce.{Job => NewJob, JobContext => NewJobContext,
OutputCommitter => NewOutputCommitter, OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, TaskAttemptContext => NewTaskAttempContext}
import org.apache.hadoop.util.Progressable
Expand Down Expand Up @@ -568,6 +568,37 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
assert(FakeWriterWithCallback.exception.getMessage contains "failed to write")
}

test("saveAsNewAPIHadoopDataset should respect empty output directory when " +
"there are no files to be committed to an absolute output location") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)

val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
job.setOutputKeyClass(classOf[Integer])
job.setOutputValueClass(classOf[Integer])
job.setOutputFormatClass(classOf[NewFakeFormat])
val jobConfiguration = job.getConfiguration

// just test that the job does not fail with
// java.lang.IllegalArgumentException: Can not create a Path from a null string
pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
}

test("saveAsHadoopDataset should respect empty output directory when " +
"there are no files to be committed to an absolute output location") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)

val conf = new JobConf()
conf.setOutputKeyClass(classOf[Integer])
conf.setOutputValueClass(classOf[Integer])
conf.setOutputFormat(classOf[FakeOutputFormat])
conf.setOutputCommitter(classOf[FakeOutputCommitter])

FakeOutputCommitter.ran = false
pairs.saveAsHadoopDataset(conf)

assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
}

test("lookup") {
val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7)))

Expand Down

0 comments on commit 2030f19

Please sign in to comment.