From aa243cfe775a9895d8261f516ca2e38473ee26b1 Mon Sep 17 00:00:00 2001 From: Shuyang Sheng Date: Mon, 24 Sep 2018 16:03:39 -0700 Subject: [PATCH 1/2] fix kinesis checkpoint recovery --- .../kinesis/KinesisInputDStream.scala | 60 +++++++++------ .../kinesis/KinesisStreamSuite.scala | 74 +++++++++++++++++-- 2 files changed, 106 insertions(+), 28 deletions(-) diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index 1ffec01df9f00..b6531b4c06c29 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -29,8 +29,10 @@ import org.apache.spark.streaming.{Duration, StreamingContext, Time} import org.apache.spark.streaming.api.java.JavaStreamingContext import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest +import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.streaming.util.WriteAheadLogUtils private[kinesis] class KinesisInputDStream[T: ClassTag]( _ssc: StreamingContext, @@ -51,29 +53,45 @@ private[kinesis] class KinesisInputDStream[T: ClassTag]( private[streaming] override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { - - // This returns true even for when blockInfos is empty - val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty) - - if (allBlocksHaveRanges) { - // Create a KinesisBackedBlockRDD, even when there are no blocks - val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray - val seqNumRanges = blockInfos.map { - _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray - val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray - logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " + + val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray + val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray + + // Check whether WAL is enabled + if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { + val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } + if (areWALRecordHandlesPresent) { + // If all the blocks have WAL record handle, then create a WALBackedBlockRDD + val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray + logInfo(s"Creating WriteAheadLogBackedBlockRDD for $time.") + + new WriteAheadLogBackedBlockRDD( + ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) + } else { + logWarning("Some blocks do not have Write Ahead Log information; " + + "this is unexpected and data may not be recoverable after driver failures") + super.createBlockRDD(time, blockInfos) + } + } else { + // This returns true even for when blockInfos is empty + val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty) + if (allBlocksHaveRanges) { + // Create a KinesisBackedBlockRDD, even when there are no blocks + val seqNumRanges = blockInfos.map { + _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray + logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " + s"seq number ranges: ${seqNumRanges.mkString(", ")} ") - new KinesisBackedBlockRDD( - context.sc, regionName, endpointUrl, blockIds, seqNumRanges, - isBlockIdValid = isBlockIdValid, - messageHandler = messageHandler, - kinesisCreds = kinesisCreds, - kinesisReadConfigs = KinesisReadConfigurations(ssc)) - } else { - logWarning("Kinesis sequence number information was not present with some block metadata," + - " it may not be possible to recover from failures") - super.createBlockRDD(time, blockInfos) + new KinesisBackedBlockRDD( + context.sc, regionName, endpointUrl, blockIds, seqNumRanges, + isBlockIdValid = isBlockIdValid, + messageHandler = messageHandler, + kinesisCreds = kinesisCreds, + kinesisReadConfigs = KinesisReadConfigurations(ssc)) + } else { + logWarning("Kinesis sequence number information was not present with some block metadata," + + " it may not be possible to recover from failures") + super.createBlockRDD(time, blockInfos) + } } } diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index a7a68eba910bf..ad31e6aab9848 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -37,12 +37,14 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._ import org.apache.spark.streaming.kinesis.KinesisTestUtils._ -import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult +import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD +import org.apache.spark.streaming.receiver.{BlockManagerBasedStoreResult, WriteAheadLogBasedStoreResult} import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils} import org.apache.spark.util.Utils -abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFunSuite - with Eventually with BeforeAndAfter with BeforeAndAfterAll { +abstract class KinesisStreamTests(aggregateTestData: Boolean, enableWAL: Boolean) + extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { // This is the name that KCL will use to save metadata to DynamoDB private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" @@ -62,6 +64,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun val conf = new SparkConf() .setMaster("local[4]") .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name + conf.set(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY, enableWAL.toString) + require(WriteAheadLogUtils.enableReceiverLog(conf) === enableWAL) sc = new SparkContext(conf) runIfTestsEnabled("Prepare KinesisTestUtils") { @@ -99,6 +103,18 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun } } + private def testWithoutWAL(msg: String)(body: => Unit): Unit = { + test(s"Without WAL enabled: $msg") { + if (!enableWAL) body + } + } + + private def testWithWAL(msg: String)(body: => Unit): Unit = { + test(s"With WAL enabled: $msg") { + if (enableWAL) body + } + } + test("KinesisUtils API") { val kinesisStream1 = KinesisUtils.createStream(ssc, "myAppName", "mySparkStream", dummyEndpointUrl, dummyRegionName, @@ -109,7 +125,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun dummyAWSAccessKey, dummyAWSSecretKey) } - test("RDD generation") { + testWithoutWAL("KinesisBackedBlockRDD generation") { val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream", dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey) @@ -164,6 +180,42 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun } } + testWithWAL("WriteAheadLogBackedBlockRDD generation") { + val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream", + dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), + StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey) + assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]]) + + val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]] + val time = Time(1000) + + // Generate block info data for testing + val seqNumRanges1 = SequenceNumberRanges( + SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy", 67)) + val blockId1 = StreamBlockId(kinesisStream.id, 123) + val blockInfo1 = ReceivedBlockInfo( + 0, None, Some(seqNumRanges1), + new WriteAheadLogBasedStoreResult(blockId1, None, new WriteAheadLogRecordHandle { })) + + val seqNumRanges2 = SequenceNumberRanges( + SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb", 89)) + val blockId2 = StreamBlockId(kinesisStream.id, 345) + val blockInfo2 = ReceivedBlockInfo( + 0, None, Some(seqNumRanges2), + new WriteAheadLogBasedStoreResult(blockId2, None, new WriteAheadLogRecordHandle { })) + + // Verify that WriteAheadLogBackedBlockRDD is generated when all block info having WAL info + val blockInfos = Seq(blockInfo1, blockInfo2) + val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos) + nonEmptyRDD shouldBe a [WriteAheadLogBackedBlockRDD[_]] + val rdd = nonEmptyRDD.asInstanceOf[WriteAheadLogBackedBlockRDD[_]] + assert(rdd.walRecordHandles.toSeq === blockInfos.map { _.walRecordHandleOption.get }) + + // Verify that WriteAheadLogBackedBlockRDD is generated even when there are no blocks + val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty) + assert(emptyRDD.isInstanceOf[WriteAheadLogBackedBlockRDD[_]]) + assert(emptyRDD.isEmpty()) + } /** * Test the stream by sending data to a Kinesis stream and receiving from it. @@ -236,7 +288,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun ssc.stop(stopSparkContext = false) } - test("Kinesis read with custom configurations") { + testWithoutWAL("Kinesis read with custom configurations") { try { ssc.sc.conf.set(RETRY_WAIT_TIME_KEY, "2000ms") ssc.sc.conf.set(RETRY_MAX_ATTEMPTS_KEY, "5") @@ -431,6 +483,14 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun } } -class WithAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = true) +class WithAggregationAndWithWALKinesisStreamSuite + extends KinesisStreamTests(aggregateTestData = true, enableWAL = true) + +class WithAggregationAndWithoutWALKinesisStreamSuite + extends KinesisStreamTests(aggregateTestData = true, enableWAL = false) + +class WithoutAggregationAndWithWALKinesisStreamSuite + extends KinesisStreamTests(aggregateTestData = false, enableWAL = true) -class WithoutAggregationKinesisStreamSuite extends KinesisStreamTests(aggregateTestData = false) +class WithoutAggregationAndWithoutWALKinesisStreamSuite + extends KinesisStreamTests(aggregateTestData = false, enableWAL = false) From b8ca84ff807de4e2f7f691d40affb557cdccc58f Mon Sep 17 00:00:00 2001 From: Shuyang Sheng Date: Tue, 25 Sep 2018 16:27:48 -0700 Subject: [PATCH 2/2] watermark the patch --- external/kinesis-asl/pom.xml | 2 +- .../apache/spark/streaming/kinesis/KinesisInputDStream.scala | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/external/kinesis-asl/pom.xml b/external/kinesis-asl/pom.xml index 126624ecd8922..379f87dd6304e 100644 --- a/external/kinesis-asl/pom.xml +++ b/external/kinesis-asl/pom.xml @@ -25,7 +25,7 @@ - spark-streaming-kinesis-asl_2.11 + spark-streaming-kinesis-asl-patch_2.11 jar Spark Kinesis Integration diff --git a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala index b6531b4c06c29..213bf58f04a8d 100644 --- a/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala +++ b/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -34,6 +34,8 @@ import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.ReceivedBlockInfo import org.apache.spark.streaming.util.WriteAheadLogUtils +case class MyDog(name: String) + private[kinesis] class KinesisInputDStream[T: ClassTag]( _ssc: StreamingContext, val streamName: String,