Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6369] [SQL] Uses commit coordinator to help committing Hive and Parquet tables #5139

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 2 additions & 50 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Expand Up @@ -26,7 +26,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD

Expand Down Expand Up @@ -104,55 +103,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()

// Called after we have decided to commit
def performCommit(): Unit = {
try {
cmtr.commitTask(taCtxt)
logInfo (s"$taID: Committed")
} catch {
case e: IOException =>
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}

// First, check whether the task's output has already been committed by some other attempt
if (cmtr.needsTaskCommit(taCtxt)) {
// The task output needs to be committed, but we don't know whether some other task attempt
// might be racing to commit the same output partition. Therefore, coordinate with the driver
// in order to determine whether this attempt can commit (see SPARK-4879).
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
}
if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
if (canCommit) {
performCommit()
} else {
val msg = s"$taID: Not committed because the driver did not authorize commit"
logInfo(msg)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
cmtr.abortTask(taCtxt)
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
performCommit()
}
} else {
// Some other attempt committed the output, so we do nothing and signal success
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
}
SparkHadoopMapRedUtil.commitTask(
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
}

def commitJob() {
Expand Down
Expand Up @@ -17,9 +17,15 @@

package org.apache.spark.mapred

import java.io.IOException
import java.lang.reflect.Modifier

import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext}
import org.apache.hadoop.mapred._
import org.apache.hadoop.mapreduce.{TaskAttemptContext => MapReduceTaskAttemptContext}
import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter}

import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.{Logging, SparkEnv, TaskContext}

private[spark]
trait SparkHadoopMapRedUtil {
Expand Down Expand Up @@ -65,3 +71,77 @@ trait SparkHadoopMapRedUtil {
}
}
}

object SparkHadoopMapRedUtil extends Logging {
def commitTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe place this overloaded version below the main one as it is merely a convenience method (also allows us to avoid duplicating the documentation)

committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
sparkTaskContext: TaskContext): Unit = {
commitTask(
committer,
mrTaskContext,
sparkTaskContext.stageId(),
sparkTaskContext.partitionId(),
sparkTaskContext.attemptNumber())
}

def commitTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation to this guy mentioning what it means to commitTask (i.e., we may contact the driver to become authorized to commit to ensure speculative tasks do not override each other, and that this may cause us to abort the task by throwing a CommitDeniedException if we cannot become authorized as such), pointing to the JIRA that this fixes (the original one).

committer: MapReduceOutputCommitter,
mrTaskContext: MapReduceTaskAttemptContext,
jobId: Int,
splitId: Int,
attemptId: Int): Unit = {

val mrTaskAttemptID = mrTaskContext.getTaskAttemptID

// Called after we have decided to commit
def performCommit(): Unit = {
try {
committer.commitTask(mrTaskContext)
logInfo(s"$mrTaskAttemptID: Committed")
} catch {
case cause: IOException =>
logError(s"Error committing the output of task: $mrTaskAttemptID", cause)
committer.abortTask(mrTaskContext)
throw cause
}
}

// First, check whether the task's output has already been committed by some other attempt
if (committer.needsTaskCommit(mrTaskContext)) {
// The task output needs to be committed, but we don't know whether some other task attempt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, it may be sufficient to lift and modify this comment up to the javadoc.

// might be racing to commit the same output partition. Therefore, coordinate with the driver
// in order to determine whether this attempt can commit (see SPARK-4879).
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
}

if (shouldCoordinateWithDriver) {
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, attemptId)

if (canCommit) {
performCommit()
} else {
val message =
s"$mrTaskAttemptID: Not committed because the driver did not authorize commit"
logInfo(message)
// We need to abort the task so that the driver can reschedule new attempts, if necessary
committer.abortTask(mrTaskContext)
throw new CommitDeniedException(message, jobId, splitId, attemptId)
}
} else {
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
performCommit()
}
} else {
// Some other attempt committed the output, so we do nothing and signal success
logInfo(s"No need to commit output of task because needsTaskCommit=false: $mrTaskAttemptID")
}
}
}
Expand Up @@ -19,10 +19,9 @@ package org.apache.spark.sql.parquet

import java.io.IOException
import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
import java.text.NumberFormat
import java.text.{NumberFormat, SimpleDateFormat}
import java.util.concurrent.{Callable, TimeUnit}
import java.util.{ArrayList, Collections, Date, List => JList}
import java.util.{Date, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable
Expand All @@ -43,12 +42,13 @@ import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLConf
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, SerializableWritable, TaskContext}

/**
Expand Down Expand Up @@ -356,7 +356,7 @@ private[sql] case class InsertIntoParquetTable(
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
1
}
val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
Expand Down Expand Up @@ -512,6 +512,7 @@ private[parquet] class FilteringParquetRowInputFormat

import parquet.filter2.compat.FilterCompat.Filter
import parquet.filter2.compat.RowGroupFilter

import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache

val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
Expand Down
Expand Up @@ -42,6 +42,7 @@ import parquet.hadoop.{ParquetInputFormat, _}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD, RDD}
import org.apache.spark.sql.catalyst.expressions
Expand Down Expand Up @@ -662,7 +663,8 @@ private[sql] case class ParquetRelation2(
} finally {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)

SparkHadoopMapRedUtil.commitTask(committer, hadoopContext, context)
}
val jobFormat = new AppendingParquetOutputFormat(taskIdOffset)
/* apparently we need a TaskAttemptID to construct an OutputCommitter;
Expand Down
Expand Up @@ -72,7 +72,6 @@ case class InsertIntoHiveTable(
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
assert(outputFileFormatClassName != null, "Output format class not set")
conf.value.set("mapred.output.format.class", outputFileFormatClassName)
conf.value.setOutputCommitter(classOf[FileOutputCommitter])

FileOutputFormat.setOutputPath(
conf.value,
Expand Down
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.hive

import java.io.IOException
import java.text.NumberFormat
import java.util.Date

Expand Down Expand Up @@ -118,19 +117,7 @@ private[hive] class SparkHiveWriterContainer(
}

protected def commit() {
if (committer.needsTaskCommit(taskContext)) {
try {
committer.commitTask(taskContext)
logInfo (taID + ": Committed")
} catch {
case e: IOException =>
logError("Error committing the output of task: " + taID.value, e)
committer.abortTask(taskContext)
throw e
}
} else {
logInfo("No need to commit output of task: " + taID.value)
}
SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID, attemptID)
}

private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
Expand Down Expand Up @@ -213,7 +200,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
.zip(row.toSeq.takeRight(dynamicPartColNames.length))
.map { case (col, rawVal) =>
val string = if (rawVal == null) null else String.valueOf(rawVal)
val colString =
val colString =
if (string == null || string.isEmpty) {
defaultPartName
} else {
Expand Down