Skip to content

Commit

Permalink
Different approach to generate unique task ID.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Jun 22, 2018
1 parent 227d513 commit a16d9f9
Showing 1 changed file with 4 additions and 16 deletions.
Expand Up @@ -76,29 +76,17 @@ object SparkHadoopWriter extends Logging {
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
// Generate a positive integer task ID that is unique for the current stage. This makes a
// few assumptions:
// - the task ID is always positive
// - stages cannot have more than Int.MaxValue
// - the sum of task counts of all active stages doesn't exceed Int.MaxValue
//
// The first two are currently the case in Spark, while the last one is very unlikely to
// occur. If it does, two tasks IDs on a single stage could have a clashing integer value,
// which could lead to code that generates clashing file names for different tasks. Still,
// if the commit coordinator is enabled, only one task would be allowed to commit.
var taskId = context.taskAttemptId
while (taskId > Int.MaxValue) {
taskId -= Int.MaxValue
}
val stageTaskId = taskId.toInt
// SPARK-24552: Generate a unique "task ID" based on the stage and task atempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val taskId = (context.stageAttemptNumber << 16) | context.attemptNumber

executeTask(
context = context,
config = config,
jobTrackerId = jobTrackerId,
commitJobId = commitJobId,
sparkPartitionId = context.partitionId,
sparkTaskId = stageTaskId,
sparkTaskId = taskId,
committer = committer,
iterator = iter)
})
Expand Down

0 comments on commit a16d9f9

Please sign in to comment.