From 8c0f5d18b69674ba0e214c10ca2588a0f20cd008 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 24 Mar 2016 10:42:57 -0700 Subject: [PATCH 1/6] Add a workaround for HADOOP-10622 to fix DataFrameReaderWriterSuite There is a potential dead-lock in Hadoop Shell.runCommand before 2.5.0 ([HADOOP-10622](https://issues.apache.org/jira/browse/HADOOP-10622)). If we interrupt some thread running Shell.runCommand, we may hit this issue. This PR adds some protecion to prevent from interrupting the microBatchThread when we may run into Shell.runCommand. There are two places will call Shell.runCommand now: - offsetLog.add - FileStreamSource.getOffset They will create a file using HDFS API and call Shell.runCommand to set the file permission. --- .../execution/streaming/StreamExecution.scala | 74 +++++++++++++++++-- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5abd7eca2c2e3..8ac7d345c42b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicInteger +import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -101,6 +102,65 @@ class StreamExecution( private val offsetLog = new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets")) + /** A monitor to protect "uninterruptible" and "interrupted" */ + private val uninterruptibleLock = new Object + + /** + * Indicates if "microBatchThread" are in the uninterruptible status. If so, interrupting + * "microBatchThread" will be deferred until "microBatchThread" enters into the interruptible + * status. + */ + @GuardedBy("uninterruptibleLock") + private var uninterruptible = false + + /** + * Indicates if we should interrupt "microBatchThread" when we are leaving the uninterruptible + * zone. + */ + @GuardedBy("uninterruptibleLock") + private var interrupted = false + + /** + * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible + * status. "microBatchThread" won't be interrupted until it enters into the interruptible status. + */ + private def interruptMicroBatchThreadSafely(): Unit = { + synchronized { + if (uninterruptible == true) { + interrupted = true + } else { + microBatchThread.interrupt() + } + } + } + + /** + * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted before + * returning from `f`. + */ + private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = { + assert(Thread.currentThread() == microBatchThread) + synchronized { + uninterruptible = true + // Clear the interrupted status if it's set. + if (Thread.interrupted()) { + interrupted = true + } + } + try { + f + } finally { + synchronized { + uninterruptible = false + if (interrupted) { + // Recover the interrupted status + microBatchThread.interrupt() + interrupted = false + } + } + } + } + /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE @@ -228,13 +288,17 @@ class StreamExecution( committedOffsets ++= availableOffsets // Check to see what new data is available. - val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + val newData = runUninterruptiblyInMicroBatchThread { + uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) + } availableOffsets ++= newData if (dataAvailable) { - assert( - offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), - s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + runUninterruptiblyInMicroBatchThread { + assert( + offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), + s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") + } currentBatchId += 1 logInfo(s"Committed offsets for batch $currentBatchId.") true @@ -320,7 +384,7 @@ class StreamExecution( // intentionally state = TERMINATED if (microBatchThread.isAlive) { - microBatchThread.interrupt() + interruptMicroBatchThreadSafely() microBatchThread.join() } logInfo(s"Query $name was stopped") From 7680aa057ca1d12347675072006dab934b35b645 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 23 Mar 2016 14:49:46 -0700 Subject: [PATCH 2/6] Reproduce DataFrameReaderWriterSuite failure --- dev/run-tests | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dev/run-tests b/dev/run-tests index 257d1e8d50bb4..380eed25c1f6a 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -20,4 +20,8 @@ FWDIR="$(cd "`dirname $0`"/..; pwd)" cd "$FWDIR" -exec python -u ./dev/run-tests.py "$@" +set -e +for i in `seq 1 300`; do + build/sbt -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 "project sql" test +done +# exec python -u ./dev/run-tests.py "$@" From 69124c167db7aecdff456a04f699ac19b50dd4f0 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 24 Mar 2016 11:25:42 -0700 Subject: [PATCH 3/6] Address --- .../spark/sql/execution/streaming/StreamExecution.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8ac7d345c42b4..d61c4d48b8445 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -125,8 +125,8 @@ class StreamExecution( * status. "microBatchThread" won't be interrupted until it enters into the interruptible status. */ private def interruptMicroBatchThreadSafely(): Unit = { - synchronized { - if (uninterruptible == true) { + uninterruptibleLock.synchronized { + if (uninterruptible) { interrupted = true } else { microBatchThread.interrupt() @@ -140,7 +140,7 @@ class StreamExecution( */ private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = { assert(Thread.currentThread() == microBatchThread) - synchronized { + uninterruptibleLock.synchronized { uninterruptible = true // Clear the interrupted status if it's set. if (Thread.interrupted()) { @@ -150,7 +150,7 @@ class StreamExecution( try { f } finally { - synchronized { + uninterruptibleLock.synchronized { uninterruptible = false if (interrupted) { // Recover the interrupted status From d8dcd04c2a174af6f7f06bc2d004ec948b622d4a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 24 Mar 2016 13:28:17 -0700 Subject: [PATCH 4/6] Revert "Reproduce DataFrameReaderWriterSuite failure" This reverts commit 7680aa057ca1d12347675072006dab934b35b645. --- dev/run-tests | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 380eed25c1f6a..257d1e8d50bb4 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -20,8 +20,4 @@ FWDIR="$(cd "`dirname $0`"/..; pwd)" cd "$FWDIR" -set -e -for i in `seq 1 300`; do - build/sbt -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 "project sql" test -done -# exec python -u ./dev/run-tests.py "$@" +exec python -u ./dev/run-tests.py "$@" From 9809acff6bc0b52ca36e02ad23b9e23593f6fd8a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 25 Mar 2016 11:24:34 -0700 Subject: [PATCH 5/6] Add document --- .../sql/execution/streaming/StreamExecution.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d61c4d48b8445..0883ed3a9f95a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -122,7 +122,7 @@ class StreamExecution( /** * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible - * status. "microBatchThread" won't be interrupted until it enters into the interruptible status. + * status, "microBatchThread" won't be interrupted until it enters into the interruptible status. */ private def interruptMicroBatchThreadSafely(): Unit = { uninterruptibleLock.synchronized { @@ -287,6 +287,12 @@ class StreamExecution( // Update committed offsets. committedOffsets ++= availableOffsets + // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). + // If we interrupt some thread running Shell.runCommand, we may hit this issue. + // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" + // to set the file permission, we should not interrupt "microBatchThread" when running this + // method. See SPARK-14131. + // // Check to see what new data is available. val newData = runUninterruptiblyInMicroBatchThread { uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) @@ -294,6 +300,11 @@ class StreamExecution( availableOffsets ++= newData if (dataAvailable) { + // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). + // If we interrupt some thread running Shell.runCommand, we may hit this issue. + // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set + // the file permission, we should not interrupt "microBatchThread" when running this method. + // See SPARK-14131. runUninterruptiblyInMicroBatchThread { assert( offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), From 45f1452419ffd126c14517af65a55a37347cffe5 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 25 Mar 2016 11:41:32 -0700 Subject: [PATCH 6/6] Rename --- .../sql/execution/streaming/StreamExecution.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 0883ed3a9f95a..60e00d203ccde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -118,7 +118,7 @@ class StreamExecution( * zone. */ @GuardedBy("uninterruptibleLock") - private var interrupted = false + private var shouldInterruptThread = false /** * Interrupt "microBatchThread" if possible. If "microBatchThread" is in the uninterruptible @@ -127,7 +127,7 @@ class StreamExecution( private def interruptMicroBatchThreadSafely(): Unit = { uninterruptibleLock.synchronized { if (uninterruptible) { - interrupted = true + shouldInterruptThread = true } else { microBatchThread.interrupt() } @@ -144,7 +144,7 @@ class StreamExecution( uninterruptible = true // Clear the interrupted status if it's set. if (Thread.interrupted()) { - interrupted = true + shouldInterruptThread = true } } try { @@ -152,10 +152,10 @@ class StreamExecution( } finally { uninterruptibleLock.synchronized { uninterruptible = false - if (interrupted) { + if (shouldInterruptThread) { // Recover the interrupted status microBatchThread.interrupt() - interrupted = false + shouldInterruptThread = false } } }