From 6cf1cd415e02e9eef4eee0c637804a612108837c Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 22 May 2015 00:02:06 -0700 Subject: [PATCH 1/2] Made KinesisReceiver.onStart non-blocking --- .../streaming/kinesis/KinesisReceiver.scala | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 800202e9fb86a..0aff2f590ad9d 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,6 +18,8 @@ package org.apache.spark.streaming.kinesis import java.util.UUID +import scala.util.control.NonFatal + import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker} @@ -98,6 +100,9 @@ private[kinesis] class KinesisReceiver( */ private var worker: Worker = null + /** Thread running the worker */ + private var workerThread: Thread = null + /** * This is called when the KinesisReceiver starts and must be non-blocking. * The KCL creates and manages the receiving/processing thread pool through Worker.run(). @@ -126,8 +131,19 @@ private[kinesis] class KinesisReceiver( } worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration) - worker.run() - + workerThread = new Thread() { + override def run(): Unit = { + try { + worker.run() + } catch { + case NonFatal(e) => + restart("Error running the KCL worker in Receiver", e) + } + } + } + workerThread.setName("Kinesis Receiver") + workerThread.setDaemon(true) + workerThread.start() logInfo(s"Started receiver with workerId $workerId") } @@ -137,10 +153,14 @@ private[kinesis] class KinesisReceiver( * The KCL will do its best to drain and checkpoint any in-flight records upon shutdown. */ override def onStop() { - if (worker != null) { - worker.shutdown() + if (workerThread != null) { + if (worker != null) { + worker.shutdown() + worker = null + } + workerThread.join() + workerThread = null logInfo(s"Stopped receiver for workerId $workerId") - worker = null } workerId = null } From 2584683543830a0d77d04a71ad079db4f2a77dd9 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 22 May 2015 17:21:47 -0700 Subject: [PATCH 2/2] Added receiver id in thread name --- .../org/apache/spark/streaming/kinesis/KinesisReceiver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 0aff2f590ad9d..7dd8bfdc2a6db 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -141,7 +141,7 @@ private[kinesis] class KinesisReceiver( } } } - workerThread.setName("Kinesis Receiver") + workerThread.setName(s"Kinesis Receiver ${streamId}") workerThread.setDaemon(true) workerThread.start() logInfo(s"Started receiver with workerId $workerId")