Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27194][core] Job failures when task attempts do not clean up spark-staging parquet files #24142

Closed
wants to merge 1 commit into from

Conversation

ajithme
Copy link
Contributor

@ajithme ajithme commented Mar 19, 2019

What changes were proposed in this pull request?

Avoid task output file overlap when task are reattempted. As currently file name considers only taskId which will be same across reattempts, it will cause collision. Instead file name can instead contain taskId along with attempt id

How was this patch tested?

Will update UT if approach is ok

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

@cloud-fan
Copy link
Contributor

Can you describe the problem first? AFAIK Spark will write files to a temp staging directory. If a task failed without cleaning up its the files, the partially written file will still be there and moved to the final directory at the end.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

@cloud-fan In InsertIntoHadoopFsRelationCommand when dynamicPartitionOverwrite is true, this will cause org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#newTaskTempFile to choose the partition directory. But in case of task reattempt due to executor loss and new executor being launched in same machine, the output will be on same path and same file. Hence it will break. Refer https://issues.apache.org/jira/browse/SPARK-27194 for the stack, here Currently looks like from logs the file name for task 200.0 and 200.1(reattempt) expected file name to be, part-00200-blah-blah.c000.snappy.parquet. (refer org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#getFilename). As only task name is used in file name, this will cause the conflict(only when new executor and reattempt task is launched on same machine). Please correct me if i am wrong

@ajithme ajithme changed the title [SPARK-27194] Job failures when task attempts do not clean up spark-staging parquet files [SPARK-27194][core] Job failures when task attempts do not clean up spark-staging parquet files Mar 19, 2019
@cloud-fan
Copy link
Contributor

If a task failed without cleaning up its the files, the partially written file will still be there and moved to the final directory at the end.

Is this a problem or you fixed it as well?

@ajithme
Copy link
Contributor Author

ajithme commented Mar 19, 2019

If a task failed without cleaning up its the files, the partially written file will still be there and moved to the final directory at the end.

Is this a problem or you fixed it as well?

The fix is not changing the behavior of writing to temp location and then moving to final location. the fix deals only with file name ( even in temp/final location, the file names must not conflict)

@rezasafi
Copy link
Contributor

I am just worried that this fix causes duplicates.
Isn't it better to call datawriter.abort() before raising the exception here:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L257
changing it to

case t: Throwable =>
  datawriter.abort()
  throw new SparkException("Task failed while writing rows.", t)

@vanzin
Copy link
Contributor

vanzin commented Mar 19, 2019

Without looking (again) into this code I have the same question about this potentially causing duplicates in the case an executor fails.

Isn't it better to call datawriter.abort() before raising the exception here:

No, because the executor may die before calling that, and the code needs to be resilient to that. (Also, as far as I can tell, that's already being done in the call to tryWithSafeFinallyAndFailureCallbacks.)

@vanzin
Copy link
Contributor

vanzin commented Mar 19, 2019

The fix is not changing the behavior of writing to temp location and then moving to final location. the fix deals only with file name ( even in temp/final location, the file names must not conflict)

You seem to be saying that with your fix, it's possible that the partially written file will end up in the final output.

If that's true, then that's worse than the current status: right now, your app will fail. With that partial fix, the app will generate wrong data.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 20, 2019

I tried with spark 2.3.3

spark.sql.sources.partitionOverwriteMode=DYNAMIC

Steps:

create table t1 (i int, part1 int, part2 int) using parquet partitioned by (part1, part2)
insert into t1 partition(part1=1, part2=1) select 1
insert overwrite table t1 partition(part1=1, part2=1) select 2
insert overwrite table t1 partition(part1=2, part2) select 2, 2  // here executor is killed and task respawns

and here is the full stack

2019-03-20 19:58:06 WARN  TaskSetManager:66 - Lost task 0.1 in stage 2.0 (TID 3, QWERTY, executor 2): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-00000-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet for client 127.0.0.1 already exists
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)

	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1653)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1689)
	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1624)
	at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:448)
	at org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:444)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:459)
	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:387)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:236)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:342)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.org$apache$spark$sql$execution$datasources$FileFormatWriter$DynamicPartitionWriteTask$$newOutputWriter(FileFormatWriter.scala:511)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$5.apply(FileFormatWriter.scala:546)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask$$anonfun$execute$5.apply(FileFormatWriter.scala:527)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:527)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
	... 8 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): /user/hive/warehouse/t2/.spark-staging-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1/part1=2/part2=2/part-00000-1f1efbfd-7e20-4e0f-a49c-a7fa3eae4cb1.c000.snappy.parquet for client 127.0.0.1 already exists
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:2578)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2465)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2349)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:624)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:398)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2213)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2213)

	at org.apache.hadoop.ipc.Client.call(Client.java:1475)
	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
	at com.sun.proxy.$Proxy15.create(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy16.create(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1648)
	... 32 more

@ajithme
Copy link
Contributor Author

ajithme commented Mar 20, 2019

So as we can see from stacktrace, when we have spark.sql.sources.partitionOverwriteMode=DYNAMIC and we have overwrite enabled @ InsertIntoHadoopFsRelationCommand,

    val enableDynamicOverwrite =
      sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
    // This config only makes sense when we are overwriting a partitioned dataset with dynamic
    // partition columns.
    val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
      staticPartitions.size < partitionColumns.length

we directly try to write to staging location via DynamicPartitionWriteTask which fails as it tries to create the file because of left over file from previous task. i.e because org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter#recordWriter always calls org.apache.parquet.hadoop.ParquetOutputFormat#getRecordWriter(Configuration, Path, org.apache.parquet.hadoop.metadata.CompressionCodecName) which always has org.apache.parquet.hadoop.ParquetFileWriter with org.apache.parquet.hadoop.ParquetFileWriter.Mode#CREATE

So for the retry task, getRecordWriter will never succeed

@ajithme
Copy link
Contributor Author

ajithme commented Mar 20, 2019

ping @vanzin @dongjoon-hyun @cloud-fan @rezasafi
I agree with the duplicate data concern mentioned here. Please suggest if we can have a better handling. One way i can think is removing the file if it already exists before calling getRecordWriter
Deleting from staging dir should not have a side effect, right.?

@vanzin
Copy link
Contributor

vanzin commented Mar 20, 2019

I don't think deleting the old file is a good idea. There might be edge cases where the failed task may still have the file opened and deleting from another place may not work as expected.

(The name conflict also means that speculation is probably completely broken when this option is used. And speculation working would mean you have two files with the output of the same task, and only one of them should end up in the final output.)

@ajithme
Copy link
Contributor Author

ajithme commented Mar 20, 2019

There might be edge cases where the failed task may still have the file opened and deleting from another place may not work as expected.

but wouldn't in this case as executor has exited, the lease will expire on the remote file letting retry task to delete successfully.? ( behaviour may differ if filesystem is not hdfs.?)

@vanzin
Copy link
Contributor

vanzin commented Mar 20, 2019

Do you want to rely on specific behaviors of file systems and executors, or write code that doesn't depend on how they behave?

And regardless of that, just deleting the file would still cause speculation to generate bad data. So it's not a fix.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 20, 2019

@vanzin thanks for clarification, i was just trying to make sure that its is the 'bad' idea to delete

Okay i see the point. So Currently i see in org.apache.spark.internal.io.HadoopMapReduceCommitProtocol#commitJob when dynamicPartitionOverwrite is true, we do this

fs.rename(new Path(stagingDir, part), finalPartPath)

This is where we will get the old and the new task file causing duplicates. Would it be better if we could eliminate speculated old tasks files from this operation.?

@vanzin
Copy link
Contributor

vanzin commented Mar 20, 2019

You seem to be trying to create a new commit protocol that a single task can perform without regard for what other tasks are doing. That is just not going to work.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 20, 2019

You seem to be trying to create a new commit protocol that a single task can perform without regard for what other tasks are doing. That is just not going to work.

No. I am trying to point the difference when dynamicPartitionOverwrite is false vs true. When False, file list is iterated to be moved. But when true entire folder is just renamed. Task commits are still independent, but job commit should consider task status. In case of retry tasks as output must not be overlapping we could have had separate file for retry and handled eliminating duplicate task outputs on job commit. I am just saying that as per current code, moving entire staging-dir partition output to final location regardless of its content (if any tasks were retired) is not good approach, it just works as retry tasks also have same file for output (hence avoids duplicate).
Sorry if approach was wrong. But my concern here is, A abnormal task which has exited, should not block or fail the retry task otherwise the whole point of retry task is useless.

Please suggest if you see a better approach to tackle the problem here

@vanzin
Copy link
Contributor

vanzin commented Mar 20, 2019

If I had an approach for fixing this, I'd have opened a PR.

I'm just trying to point out that your approach has issues. Any suggested approach needs to create the correct output regardless of task timing, task failures, or speculation.

And none of the approaches you suggested so far fit that, as far as I can tell.

@ajithme
Copy link
Contributor Author

ajithme commented Mar 20, 2019

Okay, will check for a better approach. Thanks for the inputs

@rezasafi
Copy link
Contributor

@ajithme I was just wondering whether you are still working on this? Thank you very much.

@vanzin
Copy link
Contributor

vanzin commented May 2, 2019

I'll close this for now. It can be reopened by just updating the branch.

@vanzin vanzin closed this May 2, 2019
@LantaoJin
Copy link
Contributor

@vanzin we face this problem too. After a simple investigation, I think it maybe a parquet bug. It hardcodes the Mode.CREATE when new a ParquetFileWrite (https://github.com/apache/parquet-mr/blame/47398be76cfb6634000532e9432430c4676442dd/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L420). And in this case, we need to propagate Mode.OVERWRITE in API getRecordWriter.

@advancedxy
Copy link
Contributor

Hi, @vanzin, @ajithme and @cloud-fan, I am also interested in this problem.
I'd like to proposal a fix here(use partitioned table for example, the non partition case is similar), I'd like to know your thoughts:

  1. HadoopMapReduceCommitProtocol.newTaskTempFile now returns /$stageDir/$partitionSpecs-$taskAttemptNum/$fileName
  2. commitTask should move files under /$stageDir/$partitionSpecs-$taskAttemptNum to /$stageDir/$partitionSpec. Due to host down or executor preemption, there may be files already under /$stageDir/$partition moved by other task attempt but not committed. Since we are assuming the output of one task should be idempotent, we can simply skip the movement of already existed files. Once a task is committed, all the files output by this task will be moved to the corresponding partition dirs.
  3. commitJob will be same, just move the /$stageDir/$partitionSpec to final dir.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants