Skip to content

Commit

Permalink
Output a warning when writing QueueInputDStream and throw a better ex…
Browse files Browse the repository at this point in the history
…ception when reading QueueInputDStream
  • Loading branch information
zsxwing committed Sep 6, 2015
1 parent 0349b5b commit 847cfa8
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ object CheckpointReader extends Logging {

// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
val compressionCodec = CompressionCodec.createCodec(conf)
var readError: Exception = null
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
Expand All @@ -330,13 +330,15 @@ object CheckpointReader extends Logging {
return Some(cp)
} catch {
case e: Exception =>
readError = e
logWarning("Error reading checkpoint from file " + file, e)
}
})

// If none of checkpoint files could be read, then throw exception
if (!ignoreReadError) {
throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
throw new SparkException(
s"Failed to read checkpoint from directory $checkpointPath", readError)
}
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.streaming.dstream

import java.io.{NotSerializableException, ObjectOutputStream}
import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}

import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag
Expand All @@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](

override def stop() { }

private def readObject(in: ObjectInputStream): Unit = {
throw new NotSerializableException("queueStream doesn't support checkpointing. " +
"Please don't use queueStream when checkpointing is enabled.")
}

private def writeObject(oos: ObjectOutputStream): Unit = {
throw new NotSerializableException("queueStream doesn't support checkpointing")
logWarning("queueStream doesn't support checkpointing")
}

override def compute(validTime: Time): Option[RDD[T]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -726,16 +726,26 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
}

test("queueStream doesn't support checkpointing") {
val checkpointDir = Utils.createTempDir()
ssc = new StreamingContext(master, appName, batchDuration)
val rdd = ssc.sparkContext.parallelize(1 to 10)
ssc.queueStream[Int](Queue(rdd)).print()
ssc.checkpoint(checkpointDir.getAbsolutePath)
val e = intercept[NotSerializableException] {
ssc.start()
val checkpointDirectory = Utils.createTempDir().getAbsolutePath()
def creatingFunction(): StreamingContext = {
val _ssc = new StreamingContext(conf, batchDuration)
val rdd = _ssc.sparkContext.parallelize(1 to 10)
_ssc.checkpoint(checkpointDirectory)
_ssc.queueStream[Int](Queue(rdd)).register()
_ssc
}
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
ssc.start()
eventually(timeout(10000 millis)) {
assert(Checkpoint.getCheckpointFiles(checkpointDirectory).size > 1)
}
ssc.stop()
val e = intercept[SparkException] {
ssc = StreamingContext.getOrCreate(checkpointDirectory, creatingFunction _)
}
// StreamingContext.validate changes the message, so use "contains" here
assert(e.getMessage.contains("queueStream doesn't support checkpointing"))
assert(e.getCause.getMessage.contains("queueStream doesn't support checkpointing. " +
"Please don't use queueStream when checkpointing is enabled."))
}

def addInputStream(s: StreamingContext): DStream[Int] = {
Expand Down

0 comments on commit 847cfa8

Please sign in to comment.