-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-55495][CORE] Fix EventLogFileWriters.closeWriter to handle checkError
#54280
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Could you review this PR, @HeartSaVioR , @HyukjinKwon , @viirya , @yaooqinn , @LuciferYang , @peter-toth , @pan3793 ? |
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
Outdated
Show resolved
Hide resolved
|
Thank you, @pan3793 . If there is any issue for any known downstream projects, we may want to change like the following conservatively later. In the following alternative, there is no side-effect for normal successful case. if (writer.exists(_.checkError())) {
logWarning("Spark detects errors while closing event logs.")
hadoopDataStream.foreach(_.close())
} |
|
@dongjoon-hyun, yeah, I feel the alternative is better. The change makes sense to me, soft +1, because I am not experienced with file systems in large scale other than HDFS, better to leave others have a look too. |
|
Thank you for your thoughtful feedback, @pan3793 . 😄 I updated this PR with the alternative, too. |
| hadoopDataStream.foreach(_.hflush()) | ||
|
|
||
| // 2. Try to close and check the errors | ||
| writer.foreach(_.close()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catch exceptions from close() directly to ensure the fallback runs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of HDFS, it should presumably not throw an exception; instead, a boolean status would be set to true. However, I'm not sure whether third-party libraries will adhere to this convention.
| // 2. Try to close and check the errors | ||
| writer.foreach(_.close()) | ||
| if (writer.exists(_.checkError())) { | ||
| logWarning("Spark detects errors while closing event logs.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use structured logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This follows a structured logging recommendation for a constant string message. Did I miss something, @yaooqinn ?
| * Constant String Messages: | |
| * If you are logging a constant string message, use the log methods that accept a constant | |
| * string. | |
| * <p> | |
| * | |
| * logInfo("StateStore stopped") |
|
|
||
| override def close(): Unit = { | ||
| if (throwOnClose) { | ||
| throw new IOException("Simulated close error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we match the synthetic error msg in the tests make sure that we capture the right one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test case matches Spark's code's exception message like the following. Instead of this test suite string.
assert(warningMessages.exists(_.contains("Spark detects errors while flushing")),
assert(warningMessages.exists(_.contains("Spark detects errors while closing")),
| if (writer.exists(_.checkError())) { | ||
| logWarning("Spark detects errors while flushing event logs.") | ||
| } | ||
| hadoopDataStream.foreach(_.hflush()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hadoopDataStream.foreach(_.hflush()) can throw unhandled IOException, shall we wrap a try-catch here. Maybe we can leverage something like Utils.logIOError for all fallback paths
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR aims to avoid a silent failure inside checkError. For other propagatable unhandled IOException, SparkContext.stop already logs like the following. So, I didn't use try ... catch ... or Utils.tryLog... intentionally.
spark/core/src/main/scala/org/apache/spark/SparkContext.scala
Lines 2380 to 2382 in 59f3a16
| Utils.tryLogNonFatalError { | |
| _eventLogger.foreach(_.stop()) | |
| } |
spark/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
Lines 356 to 363 in 59f3a16
| override def stop(): Unit = { | |
| closeWriter() | |
| val appStatusPathIncomplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId, | |
| inProgress = true) | |
| val appStatusPathComplete = getAppStatusFilePath(logDirForAppPath, appId, appAttemptId, | |
| inProgress = false) | |
| renameFile(appStatusPathIncomplete, appStatusPathComplete, overwrite = true) | |
| } |
In my case, the silent failure happens before renameFile. So, the last log file is not uploaded correctly and inprogress file remains. As a result, SHS shows running stages always because it's not finished.
|
Thank you for your review comments, @LuciferYang and @yaooqinn . |
yaooqinn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you for the patch and explanation
What changes were proposed in this pull request?
This PR aims to fix
EventLogFileWriters.closeWriterto handlecheckError. In general, we need the following three.flushfirst before closing to isolate any problems at this layer.PrintWriter.closeand fallback to the underlying Hadoop file stream'scloseAPI.checkErrorreturns true.Why are the changes needed?
Currently, Apache Spark's event log writer naively invokes
PrintWriter.close()without error handling.spark/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
Line 80 in 4e1cb88
spark/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala
Lines 133 to 135 in 4e1cb88
However, Java community recommends to use
checkErrorin case ofPrintWriter.flushandPrintWriter.close.When
checkErrorreturnstrue, a user can lose their event log. For example, the event log is not uploaded silently. Spark had better show a proper warning and tries to do the best efforts to flush or close the underlying Hadoop File streams at least.Does this PR introduce any user-facing change?
No, this is a bug fix for the corner case.
How was this patch tested?
Pass the CIs with the newly added test cases.
Was this patch authored or co-authored using generative AI tooling?
Generated-by:
Opus 4.5onClaude Code