Skip to content

Commit

Permalink
[SPARK-18337] Complete mode memory sinks should be able to recover fr…
Browse files Browse the repository at this point in the history
…om checkpoints

## What changes were proposed in this pull request?

It would be nice if memory sinks can also recover from checkpoints. For correctness reasons, the only time we should support it is in `Complete` OutputMode. We can support this in CompleteMode, because the output of the StateStore is already persisted in the checkpoint directory.

## How was this patch tested?

Unit test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15801 from brkyvz/mem-stream.
  • Loading branch information
brkyvz authored and tdas committed Nov 15, 2016
1 parent de545e7 commit e2452c6
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
Expand Up @@ -222,14 +222,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

val sink = new MemorySink(df.schema, outputMode)
val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
val chkpointLoc = extraOptions.get("checkpointLocation")
val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete()
val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
chkpointLoc,
df,
sink,
outputMode,
useTempCheckpointLocation = true,
recoverFromCheckpointLocation = false,
recoverFromCheckpointLocation = recoverFromChkpoint,
trigger = trigger)
resultDf.createOrReplaceTempView(query.name)
query
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.streaming.test

import java.io.File
import java.util.concurrent.TimeUnit

import scala.concurrent.duration._
Expand Down Expand Up @@ -467,4 +468,68 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
val sq = df.writeStream.format("console").start()
sq.stop()
}

test("MemorySink can recover from a checkpoint in Complete Mode") {
import testImplicits._
val ms = new MemoryStream[Int](0, sqlContext)
val df = ms.toDF().toDF("a")
val checkpointLoc = newMetadataDir
val checkpointDir = new File(checkpointLoc, "offsets")
checkpointDir.mkdirs()
assert(checkpointDir.exists())
val tableName = "test"
def startQuery: StreamingQuery = {
df.groupBy("a")
.count()
.writeStream
.format("memory")
.queryName(tableName)
.option("checkpointLocation", checkpointLoc)
.outputMode("complete")
.start()
}
// no exception here
val q = startQuery
ms.addData(0, 1)
q.processAllAvailable()
q.stop()

checkAnswer(
spark.table(tableName),
Seq(Row(0, 1), Row(1, 1))
)
spark.sql(s"drop table $tableName")
// verify table is dropped
intercept[AnalysisException](spark.table(tableName).collect())
val q2 = startQuery
ms.addData(0)
q2.processAllAvailable()
checkAnswer(
spark.table(tableName),
Seq(Row(0, 2), Row(1, 1))
)

q2.stop()
}

test("append mode memory sink's do not support checkpoint recovery") {
import testImplicits._
val ms = new MemoryStream[Int](0, sqlContext)
val df = ms.toDF().toDF("a")
val checkpointLoc = newMetadataDir
val checkpointDir = new File(checkpointLoc, "offsets")
checkpointDir.mkdirs()
assert(checkpointDir.exists())

val e = intercept[AnalysisException] {
df.writeStream
.format("memory")
.queryName("test")
.option("checkpointLocation", checkpointLoc)
.outputMode("append")
.start()
}
assert(e.getMessage.contains("does not support recovering"))
assert(e.getMessage.contains("checkpoint location"))
}
}

0 comments on commit e2452c6

Please sign in to comment.