From 0d0b3f34a90469ba894a207639456b4b815a90e8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 7 Jul 2018 16:46:21 +0200 Subject: [PATCH 1/2] Improving the error message for broadcast timeouts I added a recommendation for increasing broadcast timeout. This sentence is added to existing error message: ``` You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} ``` Author: Maxim Gekk Closes #2801 from MaxGekk/broadcast-error-message. --- .../execution/exchange/BroadcastExchangeExec.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index c55f9b8f1a7fc..4fe9752f3ca34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.execution.exchange +import java.util.concurrent.TimeoutException + import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException} + import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -140,7 +143,16 @@ case class BroadcastExchangeExec( } override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] + try { + ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] + } catch { + case ex: TimeoutException => + logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex) + throw new SparkException(s"Could not execute broadcast in ${timeout.toSeconds} secs. " + + s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " + + s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1", + ex) + } } } From 86587edca7c1345b8a3687877b598d8389fbd56b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 7 Jul 2018 17:57:36 +0200 Subject: [PATCH 2/2] Remove empty line in imports --- .../spark/sql/execution/exchange/BroadcastExchangeExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 4fe9752f3ca34..a80673c705f1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -24,7 +24,6 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import org.apache.spark.{broadcast, SparkException} - import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow