From 1d2fc298284f6d553d78035f3095e5d2abe2a8a8 Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Wed, 25 Apr 2018 17:01:29 +0400 Subject: [PATCH 1/3] Add max epoch backlog option to SQLConf --- .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ .../streaming/continuous/EpochCoordinator.scala | 13 ++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 53a50305348fa..af2587d54c236 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1271,6 +1271,14 @@ object SQLConf { .intConf .createWithDefault(Int.MaxValue) + val MAX_EPOCH_BACKLOG = buildConf("spark.sql.streaming.continuous.maxEpochBacklog") + .internal() + .doc("The max number of epochs to be stored in queue to wait for late epochs. " + + "To prevent OOM, if this parameter is exceeded by the size of the queue, " + + "an error is reported and further epochs are not remembered until queue size decreases.") + .intConf + .createWithDefault(10000) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -1641,6 +1649,8 @@ class SQLConf extends Serializable with Logging { def partitionOverwriteMode: PartitionOverwriteMode.Value = PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE)) + def maxEpochBacklog: Int = getConf(MAX_EPOCH_BACKLOG) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index 8877ebeb26735..67270fa59ea4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -123,6 +123,8 @@ private[continuous] class EpochCoordinator( override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + private val maxEpochBacklog = session.sqlContext.conf.maxEpochBacklog + private var queryWritesStopped: Boolean = false private var numReaderPartitions: Int = _ @@ -153,9 +155,14 @@ private[continuous] class EpochCoordinator( // If not, add the epoch being currently processed to epochs waiting to be committed, // otherwise commit it. if (lastCommittedEpoch != epoch - 1) { - logDebug(s"Epoch $epoch has received commits from all partitions " + - s"and is waiting for epoch ${epoch - 1} to be committed first.") - epochsWaitingToBeCommitted.add(epoch) + if (epochsWaitingToBeCommitted.size == maxEpochBacklog) { + logError("Epochs queue has reached maximum epoch backlog. " + + "Not remembering this and further epochs until size of the queue decreases.") + } else { + logDebug(s"Epoch $epoch has received commits from all partitions " + + s"and is waiting for epoch ${epoch - 1} to be committed first.") + epochsWaitingToBeCommitted.add(epoch) + } } else { commitEpoch(epoch, thisEpochCommits) lastCommittedEpoch = epoch From 0919b3f7542aa0a807b0ac56e0da1366f347bb54 Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Mon, 7 May 2018 14:11:21 +0400 Subject: [PATCH 2/3] Replace logging an error with throwing an exception --- .../continuous/ContinuousExecution.scala | 24 ++++++++++++++++--- .../continuous/EpochCoordinator.scala | 12 ++++++++-- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 0e7d1019b9c8f..2110668c48a40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.execution.streaming.continuous +import java.lang.Thread.UncaughtExceptionHandler import java.util.UUID import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference import java.util.function.UnaryOperator import scala.collection.JavaConverters._ @@ -233,9 +235,15 @@ class ContinuousExecution( } false } else if (isActive) { - currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch) - logInfo(s"New epoch $currentBatchId is starting.") - true + val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded) + if (maxBacklogExceeded) { + throw new IllegalStateException( + "Size of the epochs queue has exceeded maximum allowed epoch backlog.") + } else { + currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch) + logInfo(s"New epoch $currentBatchId is starting.") + true + } } else { false } @@ -248,7 +256,12 @@ class ContinuousExecution( } }, s"epoch update thread for $prettyIdString") + val throwableReference: AtomicReference[Throwable] = new AtomicReference[Throwable]() try { + epochUpdateThread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, throwable: Throwable): Unit = + throwableReference.set(throwable) + }) epochUpdateThread.setDaemon(true) epochUpdateThread.start() @@ -268,6 +281,11 @@ class ContinuousExecution( epochUpdateThread.interrupt() epochUpdateThread.join() + val throwable: Throwable = throwableReference.get() + if (throwable != null && throwable.isInstanceOf[IllegalStateException]) { + throw throwable.asInstanceOf[IllegalStateException] + } + stopSources() sparkSession.sparkContext.cancelJobGroup(runId.toString) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index 67270fa59ea4e..9595aab01fcec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage */ private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage +/** + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog. + */ +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage + // Init messages /** * Set the reader and writer partition counts. Tasks may not be started until the coordinator @@ -125,6 +130,7 @@ private[continuous] class EpochCoordinator( private val maxEpochBacklog = session.sqlContext.conf.maxEpochBacklog + private var maxEpochBacklogExceeded: Boolean = false private var queryWritesStopped: Boolean = false private var numReaderPartitions: Int = _ @@ -156,8 +162,7 @@ private[continuous] class EpochCoordinator( // otherwise commit it. if (lastCommittedEpoch != epoch - 1) { if (epochsWaitingToBeCommitted.size == maxEpochBacklog) { - logError("Epochs queue has reached maximum epoch backlog. " + - "Not remembering this and further epochs until size of the queue decreases.") + maxEpochBacklogExceeded = true } else { logDebug(s"Epoch $epoch has received commits from all partitions " + s"and is waiting for epoch ${epoch - 1} to be committed first.") @@ -253,5 +258,8 @@ private[continuous] class EpochCoordinator( case StopContinuousExecutionWrites => queryWritesStopped = true context.reply(()) + + case CheckIfMaxBacklogIsExceeded => + context.reply(maxEpochBacklogExceeded) } } From 88d727b61f1e52b84b5299b27e43249bcc77e662 Mon Sep 17 00:00:00 2001 From: Efim Poberezkin Date: Tue, 22 May 2018 17:53:10 +0400 Subject: [PATCH 3/3] Change conf doc --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index af2587d54c236..9ab9d56c41720 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1274,8 +1274,8 @@ object SQLConf { val MAX_EPOCH_BACKLOG = buildConf("spark.sql.streaming.continuous.maxEpochBacklog") .internal() .doc("The max number of epochs to be stored in queue to wait for late epochs. " + - "To prevent OOM, if this parameter is exceeded by the size of the queue, " + - "an error is reported and further epochs are not remembered until queue size decreases.") + "If this parameter is exceeded by the size of the queue, stream is stopped with an error " + + "indicating too many epochs stacked up.") .intConf .createWithDefault(10000)