Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31484][Core] Add stage attempt number to temp checkpoint filename to avoid file already existing exception #28255

Closed
wants to merge 3 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Apr 18, 2020

What changes were proposed in this pull request?

To avoid file already existing exception when creating checkpoint file, this PR proposes to add stage attempt number to temporary checkpoint file.

Why are the changes needed?

On our production clusters, we have seen checkpointing failure. The failed stage can possibly leave partial written checkpoint file, the task of retried stage to write checkpoint file could fail due toFileAlreadyExistsException when creating the same file, like

org.apache.hadoop.fs.FileAlreadyExistsException: /path_to_checkpoint/rdd-114/.part-03154-attempt-0 for client xxx.xxx.xxx.xxx already exists
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:359)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2353)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2273)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:728)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:851)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:794)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2490)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:270)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1263)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1205)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473)
	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:872)
	at org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:204)

Does this PR introduce any user-facing change?

Yes. Users won't see checkpoint file already existing exception after this PR.

How was this patch tested?

Add unit test.

@@ -642,4 +643,29 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton))
}
}

test("checkpoint should not fail in retry") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add SPARK-31484: prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

@dongjoon-hyun
Copy link
Member

Hi, @viirya . This is registered as an Improvement JIRA instead of Bug. So, this is only for 3.1.0. Did I understand correctly?

@viirya
Copy link
Member Author

viirya commented Apr 18, 2020

@dongjoon-hyun Yes, I think so.

@SparkQA
Copy link

SparkQA commented Apr 19, 2020

Test build #121461 has finished for PR 28255 at commit da210ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 19, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Apr 19, 2020

Test build #121465 has finished for PR 28255 at commit da210ea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 19, 2020

also cc @cloud-fan @HyukjinKwon

@SparkQA
Copy link

SparkQA commented Apr 19, 2020

Test build #121471 has finished for PR 28255 at commit f402180.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Apr 19, 2020

Test build #121474 has finished for PR 28255 at commit f402180.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @viirya and @HyukjinKwon .
Merged to master for Apache Spark 3.1.0.

val tempOutputPath =
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
val tempOutputPath = new Path(outputDir,
s".$finalOutputName-attempt-${ctx.stageAttemptNumber()}-${ctx.attemptNumber()}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stageAttemptNumber isn't unique among stages and attemptNumber isn't unique among tasks within the same stage. So it seems that this could still lead to the file name conflict. e.g. task 0.0 from stage 0.0 could conflict with task 1.0 from stage 1.0 (different stage) and task 1.0 from stage 1.0 could conflict with task 2.0 from stage 1.0 (same stage).

I think the unique file format should be ...-stageId-stageAttemptId-taskId-taskAttemptId-....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A RDD can across stages?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but all attempts Id starts from 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see we suffix the checkpoint path with rdd id...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just want a unique file name, can we use the task id? It's unique within the Spark application.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean taskAttemptId?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, taskAttemptId, we also use it in the shuffle map file for making the file name unique. #24892 (comment)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me create a follow-up for it. Thanks.

HyukjinKwon pushed a commit that referenced this pull request Apr 22, 2020
### What changes were proposed in this pull request?

As suggested by #28255 (comment), this patch proposes to use taskAttemptId in checkpoint filename, instead of stageAttemptNumber + attemptNumber.

### Why are the changes needed?

To simplify checkpoint simplified and unique.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #28289 from viirya/SPARK-31484-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@viirya viirya deleted the delete-temp-checkpoint branch December 27, 2023 18:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
7 participants