Skip to content

Commit

Permalink
SPARK-27330 review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Eyal Zituny committed Aug 15, 2019
1 parent 9e8f2a4 commit b8d6aee
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ class ForeachDataWriter[T](
writer.close(errorOrNull)
} catch {
case t: Throwable =>
logWarning("Failed closing Foreach writer", t)
throw t
} finally {
closeCalled = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,34 @@ class ForeachWriterSuite extends StreamTest with SharedSQLContext with BeforeAnd
query.stop()
}
}

testQuietly("foreach with error not caused by ForeachWriter") {
withTempDir { checkpointDir =>
val input = MemoryStream[Int]
val query = input.toDS().repartition(1).map(_ / 0).writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreach(new TestForeachWriter)
.start()
input.addData(1, 2, 3, 4)

val e = intercept[StreamingQueryException] {
query.processAllAvailable()
}

assert(e.getCause.isInstanceOf[SparkException])
assert(e.getCause.getCause.getCause.getMessage === "/ by zero")
assert(query.isActive === false)

val allEvents = ForeachWriterSuite.allEvents()
assert(allEvents.size === 1)
assert(allEvents(0)(0) === ForeachWriterSuite.Open(partition = 0, version = 0))
// `close` should be called with the error
val errorEvent = allEvents(0)(1).asInstanceOf[ForeachWriterSuite.Close]
assert(errorEvent.error.get.isInstanceOf[SparkException])
assert(errorEvent.error.get.getMessage ===
"Foreach writer has been aborted due to a task failure")
}
}
}

/** A global object to collect events in the executor */
Expand Down Expand Up @@ -291,71 +319,6 @@ object ForeachWriterSuite {
}
}

class ForeachWriterAbortSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {

import testImplicits._

var mockOutputCommitCoordinator: OutputCommitCoordinator = null

override protected def sparkConf = {
new SparkConf()
.setMaster("local[2]")
.setAppName(classOf[ForeachWriterAbortSuite].getSimpleName)
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
}

override protected def createSparkSession: TestSparkSession = {
SparkSession.cleanupAnyExistingSession()
val sc = new SparkContext(sparkConf) {
override private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
mockOutputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
// Use Mockito.spy() to maintain the default infrastructure everywhere else.
// This mocking allows us to control the coordinator responses in test cases.
SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
SparkContext.numDriverCores(master), Some(mockOutputCommitCoordinator))
}
}

new TestSparkSession(sc)
}

testQuietly("foreach with abort") {
when(mockOutputCommitCoordinator.canCommit(any(), any(), any(), any()))
.thenThrow(new RuntimeException("ForeachSinkSuite error"))

withTempDir { checkpointDir =>
val input = MemoryStream[Int]
val query = input.toDS().repartition(1).writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.foreach(new TestForeachWriter()).start()
input.addData(1, 2, 3, 4)

// Error in `process` should fail the Spark job
val e = intercept[StreamingQueryException] {
query.processAllAvailable()
}
assert(e.getCause.isInstanceOf[SparkException])
assert(e.getCause.getCause.getCause.getMessage === "ForeachSinkSuite error")
assert(query.isActive === false)

val allEvents = ForeachWriterSuite.allEvents()
assert(allEvents.size === 1)
assert(allEvents(0)(0) === ForeachWriterSuite.Open(partition = 0, version = 0))
for (i <- 1 to 4) {
assert(allEvents(0)(i) === ForeachWriterSuite.Process(value = i))
}

// `close` should be called with the abort error message
val errorEvent = allEvents(0)(5).asInstanceOf[ForeachWriterSuite.Close]
assert(errorEvent.error.get.isInstanceOf[SparkException])
assert(errorEvent.error.get.getMessage.contains("aborted"))
}
}
}

/** A [[ForeachWriter]] that writes collected events to ForeachSinkSuite */
class TestForeachWriter extends ForeachWriter[Int] {
ForeachWriterSuite.clear()
Expand Down

0 comments on commit b8d6aee

Please sign in to comment.