Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.streaming.state

import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
import java.nio.channels.ClosedChannelException
import java.util.Locale

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -202,13 +203,22 @@ private[state] class HDFSBackedStateStoreProvider(
/** Abort all the updates made on this store. This store will not be usable any more. */
override def abort(): Unit = {
verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed")
try {
state = ABORTED
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
}
} catch {
case c: ClosedChannelException =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need two cases? The error message is same, and the exception is also in the log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its debug though for the expected case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it should be a warning? In this case, the task will fail and it hurts nothing to output a warning but will be helpful when we have other issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha!

// This can happen when underlying file output stream has been closed before the
// compression stream.
logDebug(s"Error aborting version $newVersion into $this", c)

state = ABORTED
if (tempDeltaFileStream != null) {
tempDeltaFileStream.close()
}
if (tempDeltaFile != null) {
fs.delete(tempDeltaFile, true)
case e: Exception =>
logWarning(s"Error aborting version $newVersion into $this", e)
}
logInfo(s"Aborted version $newVersion for $this")
}
Expand Down