Skip to content

Commit

Permalink
[SPARK-41448] Make consistent MR job IDs in FileBatchWriter and FileF…
Browse files Browse the repository at this point in the history
…ormatWriter

### What changes were proposed in this pull request?
Make consistent MR job IDs in FileBatchWriter and FileFormatWriter

### Why are the changes needed?

[SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the consistent issue for FileFormatWriter, but [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness.

Also FileBatchWriter doesn't follow this requirement, need to fix it as well.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Closes #38980 from boneanxs/SPARK-41448.

Authored-by: Hui An <hui.an@shopee.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Hui An authored and cloud-fan committed Dec 12, 2022
1 parent 1f4c8e4 commit 7801666
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,22 @@ object SparkHadoopWriterUtils {
* @return a job ID
*/
def createJobID(time: Date, id: Int): JobID = {
val jobTrackerID = createJobTrackerID(time)
createJobID(jobTrackerID, id)
}

/**
* Create a job ID.
*
* @param jobTrackerID unique job track id
* @param id job number
* @return a job ID
*/
def createJobID(jobTrackerID: String, id: Int): JobID = {
if (id < 0) {
throw new IllegalArgumentException("Job number is negative")
}
val jobtrackerID = createJobTrackerID(time)
new JobID(jobtrackerID, id)
new JobID(jobTrackerID, id)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ object FileFormatWriter extends Logging {
rdd
}

val jobIdInstant = new Date().getTime
val jobTrackerID = SparkHadoopWriterUtils.createJobTrackerID(new Date())
val ret = new Array[WriteTaskResult](rddWithNonEmptyPartitions.partitions.length)
sparkSession.sparkContext.runJob(
rddWithNonEmptyPartitions,
(taskContext: TaskContext, iter: Iterator[InternalRow]) => {
executeTask(
description = description,
jobIdInstant = jobIdInstant,
jobTrackerID = jobTrackerID,
sparkStageId = taskContext.stageId(),
sparkPartitionId = taskContext.partitionId(),
sparkAttemptNumber = taskContext.taskAttemptId().toInt & Integer.MAX_VALUE,
Expand Down Expand Up @@ -244,15 +244,15 @@ object FileFormatWriter extends Logging {
/** Writes data out in a single Spark task. */
private def executeTask(
description: WriteJobDescription,
jobIdInstant: Long,
jobTrackerID: String,
sparkStageId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[InternalRow],
concurrentOutputWriterSpec: Option[ConcurrentOutputWriterSpec]): WriteTaskResult = {

val jobId = SparkHadoopWriterUtils.createJobID(new Date(jobIdInstant), sparkStageId)
val jobId = SparkHadoopWriterUtils.createJobID(jobTrackerID, sparkStageId)
val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import org.apache.spark.sql.execution.datasources.{DynamicPartitionDataSingleWri
case class FileWriterFactory (
description: WriteJobDescription,
committer: FileCommitProtocol) extends DataWriterFactory {

private val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)

override def createWriter(partitionId: Int, realTaskId: Long): DataWriter[InternalRow] = {
val taskAttemptContext = createTaskAttemptContext(partitionId)
committer.setupTask(taskAttemptContext)
Expand All @@ -40,7 +43,6 @@ case class FileWriterFactory (
}

private def createTaskAttemptContext(partitionId: Int): TaskAttemptContextImpl = {
val jobId = SparkHadoopWriterUtils.createJobID(new Date, 0)
val taskId = new TaskID(jobId, TaskType.MAP, partitionId)
val taskAttemptId = new TaskAttemptID(taskId, 0)
// Set up the configuration object
Expand Down

0 comments on commit 7801666

Please sign in to comment.