Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24552][core][SQL] Use task ID instead of attempt number for writes. #21606

Closed
wants to merge 13 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Jun 21, 2018

This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.

For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.

Closes #21558

@vanzin
Copy link
Contributor Author

vanzin commented Jun 21, 2018

Credit here should go to @rdblue when merging.

committer: FileCommitProtocol,
iterator: Iterator[(K, V)]): TaskCommitMessage = {
// Set up a task.
val taskContext = config.createTaskAttemptContext(
jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber)
jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe?

Copy link
Contributor

@cloud-fan cloud-fan Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task id is unique across the entire Spark application life cycle, which means we may have very large task id in a long-running micro-batch streaming application.

If we do need an int here, I'd suggest we combine stageAttemptNumber and taskAttemptNumber into a int, which is much less risky.(Spark won't have a lot of stage/task attempts)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming still generates separate jobs / stages for each batch, right?

In that case this should be fine; this would only be a problem if a single stage has enough tasks to cover all the integer space (4 billion tasks). That shouldn't be even possible since I doubt that you'd be able to have more than Integer.MAX_VALUE tasks (and even that is unlikely to ever happen).

I could use abs here (and in the sql code) to avoid a negative value (potentially avoiding weird file names).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow, the task ids increment across jobs. so if you have a very long running application that continues to start new jobs you could potentially run out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what does "run out" mean?

If your task ID goes past Int.MaxValue, you'll start getting negative values here. Eventually you'll get to a long value that wraps back again to 0 when cast to an integer:

(2L + Int.MaxValue + Int.MaxValue).toInt
res2: Int = 0

So for this to "not work", which means you'd have a conflict where two tasks will generate the same output file name based on all these values (stage, task, partition, etc, etc), you need that situation to happen, which means you need about 4 billion tasks in the same stage for this to be a problem.

In other situations, you may get weird values because of the cast, but it should still work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see what you are saying, we just need to make sure it going negative doesn't cause any side affects or anything unpexected

Copy link
Contributor

@rdblue rdblue Jun 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I commented before I saw this thread, but I think it is better to use the TID because that is already exposed in the UI so it is better for tracking between UI tasks and logs. The combined attempt number isn't used anywhere so this would introduce another number to identify a task. And, shifting by 16 means that these grow huge anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To backport this, can we use the .toInt version? I think that should be safe.

* @param epochId A monotonically increasing id for streaming queries that are split in to
* discrete periods of execution. For non-streaming queries,
* this ID will always be 0.
*/
DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
DataWriter<T> createDataWriter(int partitionId, int taskId, long epochId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in SparkHadoopWriter we must have a int, but here why not just change the type to long? data source v2 is still evolving and we already made a lot of changes in the master branch.

Copy link
Contributor Author

@vanzin vanzin Jun 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with that if you're ok with it, but wouldn't that make backporting this to 2.3 a little fishy? Yeah it's evolving, but it's still a little sub-optimal to break things in a maintenance release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this patch targets 2.3, I'd say we should not change any API or document, just pass taskId.toInt as attemptNumber and add comments to explain this hacky workaround.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I understand, what's the reason for not changing the parameter name and API docs? The name is not a public API in Java, so it doesn't break anything.

And regardless of the parameter name, the API documentation is wrong (since it says you can have multiple tasks with the same ID, but different attempts, which does not happen).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The V2 commit stuff is not in 2.3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, interesting. But there is an API in 2.3:

DataWriter<T> createDataWriter(int partitionId, int attemptNumber);

Which I guess would still suffer from the problem Ryan describes in the bug. In any case, that makes it not possible to cleanly backport this, so we can make the type change here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the type change.

logInfo(s"Writer for stage $stageId / $stageAttempt, " +
s"task $partId.$attemptId is authorized to commit.")
logInfo(s"Writer for stage $stageId.$stageAttempt, " +
s"task $partId.$taskId is authorized to commit.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this we want to leave as attemptNumber

logInfo(message)
// throwing CommitDeniedException will trigger the catch block for abort
throw new CommitDeniedException(message, stageId, partId, attemptId)
throw new CommitDeniedException(message, stageId, partId, taskId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these and above messages should be the attempt number to match the output committer

@tgravescs
Copy link
Contributor

I guess it depends on how picky we want to be there are other places that use attemptNumber that we could update to task id: InternalRowDataWriterFactory, memoryV2, and SimpleWritableDataSource, places that implement createDataWriter

// The first two are currently the case in Spark, while the last one is very unlikely to
// occur. If it does, two tasks IDs on a single stage could have a clashing integer value,
// which could lead to code that generates clashing file names for different tasks. Still,
// if the commit coordinator is enabled, only one task would be allowed to commit.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it's not a simple toInt anymore, how about we combine stage and task attempt number?

val stageAttemptNumer = ...
val taskAttempNumber = ...
assert(stageAttemptNumer <= Short.MaxValue)
assert(taskAttempNumber <= Short.MaxValue)
val sparkAttempNumber = (stageAttemptNumer << 16) | taskAttempNumber

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also remove the assert and assume that, even we have so many attempts, they are not all active.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll use that. I think Spark might fail everything before you even go that high in attempt numbers anyway...

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92181 has finished for PR 21606 at commit 7233a5f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92187 has finished for PR 21606 at commit c884f4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92189 has finished for PR 21606 at commit 5efaae7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92190 has finished for PR 21606 at commit 227d513.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92192 has finished for PR 21606 at commit a16d9f9.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

@@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging {
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
// SPARK-24552: Generate a unique "task ID" based on the stage and task atempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val taskId = (context.stageAttemptNumber << 16) | context.attemptNumber
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should rename taskId to be something more unique so we don't confuse it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just something like uniqueTaskId or specialTaskId but not a big deal.

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92214 has finished for PR 21606 at commit a16d9f9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1

@@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging {
// Try to write all RDD partitions as a Hadoop OutputFormat.
try {
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => {
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers.
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently.
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should generate an ID this way. We already have a unique ID that is exposed in the Spark UI. I'd much rather make it clear that the TID passed to committers as an attempt ID is the same as the TID in the stage view. That makes debugging easier. Going with this approach just introduces yet another number to track an attempt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that taskid is a long, we can't change the hadoop api for that, and to me its more possible to have a valid task id > 2^32. It might not be ideal to do it this way but I think its a good bug fix especially for now, we can file a follow on to improve if we have ideas or want to change interface

Copy link
Contributor

@rdblue rdblue Jun 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that makes sense if this is just for Hadoop attempt IDs. Maybe that's a good thing to put in the comment as well?

s"task $partId.$attemptId is authorized to commit.")
dataWriter.commit()
} else {
val message = s"Stage $stageId / $stageAttempt, " +
val message = s"Stage $stageId.$stageAttempt, " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these logs use TID instead of attempt number? The format used in other log messages is s"Task $taskId (TID $tid)", I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This is for the next line, sorry for the confusion)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change these log messages a bit. I think the attempt is still helpful while we don't change the coordinator API (SPARK-24611), and doesn't hurt to have to there even after we do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Thanks!

@rdblue
Copy link
Contributor

rdblue commented Jun 22, 2018

+1

@vanzin
Copy link
Contributor Author

vanzin commented Jun 22, 2018

A general comment about the log messages: it seems pretty noisy to have "logInfo" messages for every task (doing so only in the "failure" paths would be better in my opinion); but I'm keeping the current log level.

@vanzin
Copy link
Contributor Author

vanzin commented Jun 22, 2018

Also, since this patch won't backport, I'll go ahead and send versions of it for branch-2.3 and branch-2.2 (which I think will be enough to also backport to 2.1).

@tgravescs
Copy link
Contributor

just an fyi, I was looking at backporting to 2.2, looks like at least some write calls don't have the issue:
https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L1128

Looks like that was lost when things we refactored.

In fact a test that rolls it, not sure why that was added:
https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala#L325

@vanzin
Copy link
Contributor Author

vanzin commented Jun 22, 2018

@SparkQA
Copy link

SparkQA commented Jun 22, 2018

Test build #92218 has finished for PR 21606 at commit 503852f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

Yeah so things like saveAsTextFile in 2.2 are ok but other functions like saveAsNewAPIHadoopFile and the dataframe writers have the issue, so we do need to backport

@SparkQA
Copy link

SparkQA commented Jun 23, 2018

Test build #92225 has finished for PR 21606 at commit 47131c5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 23, 2018

Test build #92235 has finished for PR 21606 at commit e892937.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -125,12 +124,12 @@ object DataWritingSparkTask extends Logging {
val coordinator = SparkEnv.get.outputCommitCoordinator
val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a note for the followup: since we decided to use taskId as a unique identifier for write tasks, the output coordinator can also use taskId instead of stage and task attempts.

@vanzin
Copy link
Contributor Author

vanzin commented Jun 25, 2018

Cool, so it looks like we can merge this? (And #21615 and #21616?)

@vanzin
Copy link
Contributor Author

vanzin commented Jun 25, 2018

Given the deafening silence, I'll merge the PRs myself, given there's a bunch of +1s from others.

@asfgit asfgit closed this in 6d16b98 Jun 25, 2018
@vanzin vanzin deleted the SPARK-24552.2 branch August 24, 2018 19:56
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…ites.

This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.

For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.

Closes apache#21558

Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Ryan Blue <blue@apache.org>

Closes apache#21606 from vanzin/SPARK-24552.2.

Ref: LIHADOOP-48531
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants