Skip to content

Commit

Permalink
[SPARK-14676] Wrap and re-throw Await.result exceptions in order to c…
Browse files Browse the repository at this point in the history
…apture full stacktrace

When `Await.result` throws an exception which originated from a different thread, the resulting stacktrace doesn't include the path leading to the `Await.result` call itself, making it difficult to identify the impact of these exceptions. For example, I've seen cases where broadcast cleaning errors propagate to the main thread and crash it but the resulting stacktrace doesn't include any of the main thread's code, making it difficult to pinpoint which exception crashed that thread.

This patch addresses this issue by explicitly catching, wrapping, and re-throwing exceptions that are thrown by `Await.result`.

I tested this manually using JoshRosen@16b31c8, a patch which reproduces an issue where an RPC exception which occurs while unpersisting RDDs manages to crash the main thread without any useful stacktrace, and verified that informative, full stacktraces were generated after applying the fix in this PR.

/cc rxin nongli yhuai anabranch

Author: Josh Rosen <joshrosen@databricks.com>

Closes #12433 from JoshRosen/wrap-and-rethrow-await-exceptions.
  • Loading branch information
JoshRosen authored and rxin committed Apr 19, 2016
1 parent d9620e7 commit 947b902
Show file tree
Hide file tree
Showing 28 changed files with 191 additions and 116 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaFutureAction
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.JobWaiter
import org.apache.spark.util.ThreadUtils


/**
Expand All @@ -45,6 +46,7 @@ trait FutureAction[T] extends Future[T] {

/**
* Blocks until this action completes.
*
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @return this FutureAction
Expand All @@ -53,6 +55,7 @@ trait FutureAction[T] extends Future[T] {

/**
* Awaits and returns the result (of type T) of this action.
*
* @param atMost maximum wait time, which may be negative (no waiting is done), Duration.Inf
* for unbounded waiting, or a finite positive duration
* @throws Exception exception during action execution
Expand Down Expand Up @@ -89,8 +92,8 @@ trait FutureAction[T] extends Future[T] {
/**
* Blocks and returns the result of this job.
*/
@throws(classOf[Exception])
def get(): T = Await.result(this, Duration.Inf)
@throws(classOf[SparkException])
def get(): T = ThreadUtils.awaitResult(this, Duration.Inf)

/**
* Returns the job IDs run by the underlying async operation.
Expand Down
Expand Up @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeoutException

import scala.collection.mutable.ListBuffer
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.language.postfixOps
Expand All @@ -35,7 +35,7 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.master.RecoveryState
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
Expand Down Expand Up @@ -265,7 +265,7 @@ private object FaultToleranceTest extends App with Logging {
}

// Avoid waiting indefinitely (e.g., we could register but get no executors).
assertTrue(Await.result(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
}

/**
Expand Down Expand Up @@ -318,7 +318,7 @@ private object FaultToleranceTest extends App with Logging {
}

try {
assertTrue(Await.result(f, 120 seconds))
assertTrue(ThreadUtils.awaitResult(f, 120 seconds))
} catch {
case e: TimeoutException =>
logError("Master states: " + masters.map(_.state))
Expand Down Expand Up @@ -422,7 +422,7 @@ private object SparkDocker {
}

dockerCmd.run(ProcessLogger(findIpAndLog _))
val ip = Await.result(ipPromise.future, 30 seconds)
val ip = ThreadUtils.awaitResult(ipPromise.future, 30 seconds)
val dockerId = Docker.getLastProcessId
(ip, dockerId, outFile)
}
Expand Down
Expand Up @@ -24,7 +24,7 @@ import java.util.Date
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random
Expand Down Expand Up @@ -959,7 +959,7 @@ private[deploy] class Master(
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val futureUI = asyncRebuildSparkUI(app)
Await.result(futureUI, Duration.Inf)
ThreadUtils.awaitResult(futureUI, Duration.Inf)
}

/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
Expand Down
Expand Up @@ -27,10 +27,11 @@ import scala.collection.mutable
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.io.Source
import scala.util.control.NonFatal

import com.fasterxml.jackson.core.JsonProcessingException

import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -258,13 +259,17 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
}
}

// scalastyle:off awaitresult
try { Await.result(responseFuture, 10.seconds) } catch {
// scalastyle:on awaitresult
case unreachable @ (_: FileNotFoundException | _: SocketException) =>
throw new SubmitRestConnectionException("Unable to connect to server", unreachable)
case malformed @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
throw new SubmitRestProtocolException("Malformed response received from server", malformed)
case timeout: TimeoutException =>
throw new SubmitRestConnectionException("No response from server", timeout)
case NonFatal(t) =>
throw new SparkException("Exception while waiting for response", t)
}
}

Expand Down
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.network
import java.io.Closeable
import java.nio.ByteBuffer

import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag

import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.ThreadUtils

private[spark]
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
Expand Down Expand Up @@ -100,8 +101,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
result.success(new NioManagedBuffer(ret))
}
})

Await.result(result.future, Duration.Inf)
ThreadUtils.awaitResult(result.future, Duration.Inf)
}

/**
Expand All @@ -119,6 +119,6 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
level: StorageLevel,
classTag: ClassTag[_]): Unit = {
val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
Await.result(future, Duration.Inf)
ThreadUtils.awaitResult(future, Duration.Inf)
}
}
25 changes: 18 additions & 7 deletions core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
Expand Up @@ -19,10 +19,11 @@ package org.apache.spark.rpc

import java.util.concurrent.TimeoutException

import scala.concurrent.{Await, Awaitable}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -65,14 +66,21 @@ private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: S
/**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
* @param awaitable the `Awaitable` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
*
* @param future the `Future` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `future`
* is still not ready
*/
def awaitResult[T](awaitable: Awaitable[T]): T = {
def awaitResult[T](future: Future[T]): T = {
val wrapAndRethrow: PartialFunction[Throwable, T] = {
case NonFatal(t) =>
throw new SparkException("Exception thrown in awaitResult", t)
}
try {
Await.result(awaitable, duration)
} catch addMessageIfTimeout
// scalastyle:off awaitresult
Await.result(future, duration)
// scalastyle:on awaitresult
} catch addMessageIfTimeout.orElse(wrapAndRethrow)
}
}

Expand All @@ -82,6 +90,7 @@ private[spark] object RpcTimeout {
/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
*
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @throws NoSuchElementException if property is not set
Expand All @@ -95,6 +104,7 @@ private[spark] object RpcTimeout {
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* Uses the given default value if property is not set
*
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @param defaultValue default timeout value in seconds if property not found
Expand All @@ -109,6 +119,7 @@ private[spark] object RpcTimeout {
* and create a RpcTimeout with the first set property key in the
* description.
* Uses the given default value if property is not set
*
* @param conf configuration properties containing the timeout
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
* @param defaultValue default timeout value in seconds if no properties found
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Expand Up @@ -260,7 +260,12 @@ private[spark] class BlockManager(
def waitForAsyncReregister(): Unit = {
val task = asyncReregisterTask
if (task != null) {
Await.ready(task, Duration.Inf)
try {
Await.ready(task, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for async. reregistration", t)
}
}
}

Expand Down Expand Up @@ -802,7 +807,12 @@ private[spark] class BlockManager(
logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
if (level.replication > 1) {
// Wait for asynchronous replication to finish
Await.ready(replicationFuture, Duration.Inf)
try {
Await.ready(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
}
}
if (blockWasSuccessfullyStored) {
None
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark.util

import java.util.concurrent._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.{Await, Awaitable, ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration.Duration
import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread}
import scala.util.control.NonFatal

import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder}

import org.apache.spark.SparkException

private[spark] object ThreadUtils {

private val sameThreadExecutionContext =
Expand Down Expand Up @@ -174,4 +177,21 @@ private[spark] object ThreadUtils {
false // asyncMode
)
}

// scalastyle:off awaitresult
/**
* Preferred alternative to [[Await.result()]]. This method wraps and re-throws any exceptions
* thrown by the underlying [[Await]] call, ensuring that this thread's stack trace appears in
* logs.
*/
@throws(classOf[SparkException])
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
Await.result(awaitable, atMost)
// scalastyle:on awaitresult
} catch {
case NonFatal(t) =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Expand Up @@ -1598,6 +1598,7 @@ private[spark] object Utils extends Logging {

/**
* Timing method based on iterations that permit JVM JIT optimization.
*
* @param numIters number of iterations
* @param f function to be executed. If prepare is not None, the running time of each call to f
* must be an order of magnitude longer than one millisecond for accurate timing.
Expand Down Expand Up @@ -1639,6 +1640,7 @@ private[spark] object Utils extends Logging {

/**
* Creates a symlink.
*
* @param src absolute path to the source
* @param dst relative path for the destination
*/
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/org/apache/spark/FutureActionSuite.scala
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.spark.util.ThreadUtils


class FutureActionSuite
extends SparkFunSuite
Expand All @@ -36,15 +37,15 @@ class FutureActionSuite
test("simple async action") {
val rdd = sc.parallelize(1 to 10, 2)
val job = rdd.countAsync()
val res = Await.result(job, Duration.Inf)
val res = ThreadUtils.awaitResult(job, Duration.Inf)
res should be (10)
job.jobIds.size should be (1)
}

test("complex async action") {
val rdd = sc.parallelize(1 to 15, 3)
val job = rdd.takeAsync(10)
val res = Await.result(job, Duration.Inf)
val res = ThreadUtils.awaitResult(job, Duration.Inf)
res should be (1 to 10)
job.jobIds.size should be (2)
}
Expand Down
Expand Up @@ -21,7 +21,6 @@ import java.util.concurrent.{ExecutorService, TimeUnit}

import scala.collection.Map
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

Expand All @@ -36,7 +35,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ManualClock
import org.apache.spark.util.{ManualClock, ThreadUtils}

/**
* A test suite for the heartbeating behavior between the driver and the executors.
Expand Down Expand Up @@ -231,14 +230,14 @@ class HeartbeatReceiverSuite
private def addExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.addExecutor(executorId).map { f =>
Await.result(f, 10.seconds)
ThreadUtils.awaitResult(f, 10.seconds)
} === Some(true))
}

private def removeExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.removeExecutor(executorId).map { f =>
Await.result(f, 10.seconds)
ThreadUtils.awaitResult(f, 10.seconds)
} === Some(true))
}

Expand Down

0 comments on commit 947b902

Please sign in to comment.