Skip to content

Commit

Permalink
[SPARK-21549][CORE] Respect OutputFormats with no/invalid output dire…
Browse files Browse the repository at this point in the history
…ctory provided

## What changes were proposed in this pull request?

PR apache#19294 added support for null's - but spark 2.1 handled other error cases where path argument can be invalid.
Namely:

* empty string
* URI parse exception while creating Path

This is resubmission of PR apache#19487, which I messed up while updating my repo.

## How was this patch tested?

Enhanced test to cover new support added.

Author: Mridul Muralidharan <mridul@gmail.com>

Closes apache#19497 from mridulm/master.

(cherry picked from commit 13c1559)
Signed-off-by: Mridul Muralidharan <mridul@gmail.com>
  • Loading branch information
mridulm authored and MatthewRBruce committed Jul 31, 2018
1 parent 6acc25c commit e747b36
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.internal.io
import java.util.{Date, UUID}

import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -47,6 +48,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
/** OutputCommitter from Hadoop is not serializable so marking it transient. */
@transient private var committer: OutputCommitter = _

/**
* Checks whether there are files to be committed to a valid output location.
*
* As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null,
* it is necessary to check whether a valid output path is specified.
* [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for
* committers not writing to distributed file systems.
*/
private val hasValidPath = Try { new Path(path) }.isSuccess

/**
* Tracks files staged by this task for absolute output paths. These outputs are not managed by
* the Hadoop OutputCommitter, so we must move these to their final locations on job commit.
Expand All @@ -60,15 +71,6 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
*/
private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)

/**
* Checks whether there are files to be committed to an absolute output location.
*
* As committing and aborting a job occurs on driver, where `addedAbsPathFiles` is always null,
* it is necessary to check whether the output path is specified. Output path may not be required
* for committers not writing to distributed file systems.
*/
private def hasAbsPathFiles: Boolean = path != null

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
Expand Down Expand Up @@ -141,7 +143,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
if (hasAbsPathFiles) {
if (hasValidPath) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
Expand All @@ -152,7 +154,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)

override def abortJob(jobContext: JobContext): Unit = {
committer.abortJob(jobContext, JobStatus.State.FAILED)
if (hasAbsPathFiles) {
if (hasValidPath) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(absPathStagingDir, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,21 +568,34 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
assert(FakeWriterWithCallback.exception.getMessage contains "failed to write")
}

test("saveAsNewAPIHadoopDataset should respect empty output directory when " +
test("saveAsNewAPIHadoopDataset should support invalid output paths when " +
"there are no files to be committed to an absolute output location") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)

val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
job.setOutputKeyClass(classOf[Integer])
job.setOutputValueClass(classOf[Integer])
job.setOutputFormatClass(classOf[NewFakeFormat])
val jobConfiguration = job.getConfiguration
def saveRddWithPath(path: String): Unit = {
val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration))
job.setOutputKeyClass(classOf[Integer])
job.setOutputValueClass(classOf[Integer])
job.setOutputFormatClass(classOf[NewFakeFormat])
if (null != path) {
job.getConfiguration.set("mapred.output.dir", path)
} else {
job.getConfiguration.unset("mapred.output.dir")
}
val jobConfiguration = job.getConfiguration

// just test that the job does not fail with java.lang.IllegalArgumentException.
pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
}

// just test that the job does not fail with
// java.lang.IllegalArgumentException: Can not create a Path from a null string
pairs.saveAsNewAPIHadoopDataset(jobConfiguration)
saveRddWithPath(null)
saveRddWithPath("")
saveRddWithPath("::invalid::")
}

// In spark 2.1, only null was supported - not other invalid paths.
// org.apache.hadoop.mapred.FileOutputFormat.getOutputPath fails with IllegalArgumentException
// for non-null invalid paths.
test("saveAsHadoopDataset should respect empty output directory when " +
"there are no files to be committed to an absolute output location") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(2))), 1)
Expand Down

0 comments on commit e747b36

Please sign in to comment.